Skip to content

Commit c5c7fce

Browse files
authored
refactor: refine replace into by caching individual BlockMeta (#17368)
1 parent 5581d25 commit c5c7fce

File tree

3 files changed

+51
-20
lines changed

3 files changed

+51
-20
lines changed

src/query/storages/common/cache/src/caches.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub type CompactSegmentInfoCache = InMemoryLruCache<CompactSegmentInfo>;
4444
pub type SegmentBlockMetasCache = InMemoryLruCache<Vec<Arc<BlockMeta>>>;
4545

4646
/// In-memory cache of individual BlockMeta.
47-
pub type BlockMetaCache = InMemoryLruCache<Arc<BlockMeta>>;
47+
pub type BlockMetaCache = InMemoryLruCache<BlockMeta>;
4848

4949
/// In memory object cache of TableSnapshot
5050
pub type TableSnapshotCache = InMemoryLruCache<TableSnapshot>;
@@ -186,8 +186,8 @@ impl From<Vec<Arc<BlockMeta>>> for CacheValue<Vec<Arc<BlockMeta>>> {
186186
}
187187
}
188188

189-
impl From<Arc<BlockMeta>> for CacheValue<Arc<BlockMeta>> {
190-
fn from(value: Arc<BlockMeta>) -> Self {
189+
impl From<BlockMeta> for CacheValue<BlockMeta> {
190+
fn from(value: BlockMeta) -> Self {
191191
CacheValue {
192192
inner: Arc::new(value),
193193
mem_bytes: 0,

src/query/storages/common/cache/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ mod temp_dir;
2525

2626
pub use cache::CacheAccessor;
2727
pub use cache::Unit;
28+
pub use caches::BlockMetaCache;
2829
pub use caches::CacheValue;
2930
pub use caches::CachedObject;
3031
pub use caches::SegmentBlockMetasCache;

src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ use databend_common_metrics::storage::*;
4343
use databend_common_sql::evaluator::BlockOperator;
4444
use databend_common_sql::executor::physical_plans::OnConflictField;
4545
use databend_common_sql::StreamContext;
46+
use databend_storages_common_cache::BlockMetaCache;
47+
use databend_storages_common_cache::CacheAccessor;
48+
use databend_storages_common_cache::CacheManager;
4649
use databend_storages_common_cache::LoadParams;
4750
use databend_storages_common_index::filters::Filter;
4851
use databend_storages_common_index::filters::Xor8Filter;
@@ -100,6 +103,8 @@ struct AggregationContext {
100103
io_request_semaphore: Arc<Semaphore>,
101104
// generate stream columns if necessary
102105
stream_ctx: Option<StreamContext>,
106+
107+
block_meta_cache: Option<BlockMetaCache>,
103108
}
104109

105110
// Apply MergeIntoOperations to segments
@@ -209,6 +214,7 @@ impl ReplaceIntoOperationAggregator {
209214
block_builder,
210215
io_request_semaphore,
211216
stream_ctx,
217+
block_meta_cache: CacheManager::instance().get_block_meta_cache(),
212218
}),
213219
})
214220
}
@@ -291,6 +297,8 @@ impl ReplaceIntoOperationAggregator {
291297
impl ReplaceIntoOperationAggregator {
292298
#[async_backtrace::framed]
293299
pub async fn apply(&mut self) -> Result<Option<MutationLogs>> {
300+
let block_meta_cache = &self.aggregation_ctx.block_meta_cache;
301+
294302
metrics_inc_replace_number_apply_deletion();
295303

296304
// track number of segments and blocks after pruning (per merge action application)
@@ -317,7 +325,7 @@ impl ReplaceIntoOperationAggregator {
317325
let mut mutation_log_handlers = Vec::new();
318326
let mut num_rows_mutated = 0;
319327
for (segment_idx, block_deletion) in self.deletion_accumulator.deletions.drain() {
320-
let (path, ver) = self
328+
let (segment_path, ver) = self
321329
.aggregation_ctx
322330
.segment_locations
323331
.get(&segment_idx)
@@ -329,19 +337,41 @@ impl ReplaceIntoOperationAggregator {
329337
})?;
330338

331339
let load_param = LoadParams {
332-
location: path.clone(),
340+
location: segment_path.clone(),
333341
len_hint: None,
334342
ver: *ver,
335343
put_cache: true,
336344
};
337345

338-
let compact_segment_info = aggregation_ctx.segment_reader.read(&load_param).await?;
339-
let segment_info: SegmentInfo = compact_segment_info.try_into()?;
346+
// Retain SegmentInfo to avoid repeatedly extracting it from CompactSegmentInfo later.
347+
let mut opt_segment_info: Option<SegmentInfo> = None;
340348

341349
for (block_index, keys) in block_deletion {
350+
let block_cache_key = format!("{segment_path}-{block_index}");
351+
let block_meta = match block_meta_cache.get(&block_cache_key) {
352+
Some(block_meta) => block_meta,
353+
None => {
354+
let block_meta = if let Some(segment_info) = &opt_segment_info {
355+
segment_info.blocks[block_index].clone()
356+
} else {
357+
let compact_segment_info =
358+
aggregation_ctx.segment_reader.read(&load_param).await?;
359+
let segment_info: SegmentInfo = compact_segment_info.try_into()?;
360+
let block_meta = segment_info.blocks[block_index].clone();
361+
opt_segment_info = Some(segment_info);
362+
block_meta
363+
};
364+
// A query node typically processes only a subset of the BlockMeta in a given segment.
365+
// Therefore, even though all BlockMeta of a segment are available here, not all are populated into the cache.
366+
block_meta_cache.insert(block_cache_key, block_meta.as_ref().clone());
367+
block_meta
368+
}
369+
};
370+
342371
let permit =
343372
acquire_task_permit(aggregation_ctx.io_request_semaphore.clone()).await?;
344-
let block_meta = segment_info.blocks[block_index].clone();
373+
374+
// let block_meta = segment_info.blocks[block_index].clone();
345375
let aggregation_ctx = aggregation_ctx.clone();
346376
num_rows_mutated += block_meta.row_count;
347377
// self.aggregation_ctx.
@@ -604,7 +634,7 @@ impl AggregationContext {
604634
if let Some(stats) = column_stats {
605635
let max = stats.max();
606636
let min = stats.min();
607-
std::cmp::min(key_max, max) >= std::cmp::max(key_min,min)
637+
std::cmp::min(key_max, max) >= std::cmp::max(key_min, min)
608638
|| // coincide overlap
609639
(max == key_max && min == key_min)
610640
} else {
@@ -630,22 +660,22 @@ impl AggregationContext {
630660
let reader = reader.clone();
631661
GlobalIORuntime::instance()
632662
.spawn(async move {
633-
let column_chunks = merged_io_read_result.columns_chunks()?;
634-
reader.deserialize_chunks(
635-
block_meta_ptr.location.0.as_str(),
636-
block_meta_ptr.row_count as usize,
637-
&block_meta_ptr.compression,
638-
&block_meta_ptr.col_metas,
639-
column_chunks,
640-
&storage_format,
641-
)
642-
})
663+
let column_chunks = merged_io_read_result.columns_chunks()?;
664+
reader.deserialize_chunks(
665+
block_meta_ptr.location.0.as_str(),
666+
block_meta_ptr.row_count as usize,
667+
&block_meta_ptr.compression,
668+
&block_meta_ptr.col_metas,
669+
column_chunks,
670+
&storage_format,
671+
)
672+
})
643673
.await
644674
.map_err(|e| {
645675
ErrorCode::Internal(
646676
"unexpected, failed to join aggregation context read block tasks for replace into.",
647677
)
648-
.add_message_back(e.to_string())
678+
.add_message_back(e.to_string())
649679
})?
650680
}
651681

0 commit comments

Comments
 (0)