Skip to content

Commit 3e0915f

Browse files
committed
chore: Merge the logic that triggers automatic compression upon write
1 parent 3cbcfcc commit 3e0915f

File tree

8 files changed

+55
-47
lines changed

8 files changed

+55
-47
lines changed

src/query/service/src/interpreters/hook/compact_hook.rs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ pub struct CompactTargetTableDescription {
4747
pub catalog: String,
4848
pub database: String,
4949
pub table: String,
50-
pub mutation_kind: MutationKind,
5150
}
5251

5352
pub struct CompactHookTraceCtx {
@@ -92,25 +91,14 @@ async fn do_hook_compact(
9291
info!("[COMPACT-HOOK] Operation {op_name} completed successfully, starting table optimization job.");
9392

9493
let compact_start_at = Instant::now();
95-
let compaction_limits = match compact_target.mutation_kind {
96-
MutationKind::Insert => {
97-
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
98-
info!("[COMPACT-HOOK] Table {} requires compaction of {} blocks", compact_target.table, compaction_num_block_hint);
99-
if compaction_num_block_hint == 0 {
100-
return Ok(());
101-
}
102-
CompactionLimits {
103-
segment_limit: None,
104-
block_limit: Some(compaction_num_block_hint as usize),
105-
}
106-
}
107-
_ => {
108-
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
109-
CompactionLimits {
110-
segment_limit: Some(auto_compaction_segments_limit as usize),
111-
block_limit: None,
112-
}
113-
}
94+
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
95+
info!("[COMPACT-HOOK] Table {} requires compaction of {} blocks", compact_target.table, compaction_num_block_hint);
96+
if compaction_num_block_hint == 0 {
97+
return Ok(());
98+
}
99+
let compaction_limits = CompactionLimits {
100+
segment_limit: None,
101+
block_limit: Some(compaction_num_block_hint as usize),
114102
};
115103

116104
// keep the original progress value

src/query/service/src/interpreters/hook/hook.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ impl HookOperator {
9696
catalog: self.catalog.to_owned(),
9797
database: self.database.to_owned(),
9898
table: self.table.to_owned(),
99-
mutation_kind: self.mutation_kind,
10099
};
101100

102101
let trace_ctx = CompactHookTraceCtx {

src/query/settings/src/settings_default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ impl DefaultSettings {
892892
}),
893893
("auto_compaction_segments_limit", DefaultSettingValue {
894894
value: UserSettingValue::UInt64(3),
895-
desc: "The maximum number of segments that can be compacted automatically triggered after write(replace-into/merge-into).",
895+
desc: "The maximum number of segments that can be reclustered automatically triggered after write.",
896896
mode: SettingMode::Both,
897897
scope: SettingScope::Both,
898898
range: Some(SettingRange::Numeric(2..=u64::MAX)),

src/query/storages/fuse/src/operations/common/generators/append_generator.rs

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use databend_storages_common_table_meta::meta::ColumnStatistics;
3030
use databend_storages_common_table_meta::meta::Statistics;
3131
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
3232
use databend_storages_common_table_meta::meta::TableSnapshot;
33-
use log::info;
3433
use log::warn;
3534

3635
use crate::operations::common::ConflictResolveContext;
@@ -41,7 +40,8 @@ use crate::statistics::TableStatsGenerator;
4140

4241
#[derive(Clone)]
4342
pub struct AppendGenerator {
44-
ctx: Arc<dyn TableContext>,
43+
pub(crate) ctx: Arc<dyn TableContext>,
44+
4545
leaf_default_values: HashMap<ColumnId, Scalar>,
4646
overwrite: bool,
4747
conflict_resolve_ctx: ConflictResolveContext,
@@ -194,28 +194,6 @@ impl SnapshotGenerator for AppendGenerator {
194194
}
195195
}
196196

197-
// check if need to auto compact
198-
// the algorithm is: if the number of imperfect blocks is greater than the threshold, then auto compact.
199-
// the threshold is set by the setting `auto_compaction_imperfect_blocks_threshold`, default is 25.
200-
let imperfect_count = new_summary.block_count - new_summary.perfect_block_count;
201-
let auto_compaction_imperfect_blocks_threshold = self
202-
.ctx
203-
.get_settings()
204-
.get_auto_compaction_imperfect_blocks_threshold()?;
205-
206-
if imperfect_count >= auto_compaction_imperfect_blocks_threshold {
207-
// If imperfect_count is larger, SLIGHTLY increase the number of blocks
208-
// eligible for auto-compaction, this adjustment is intended to help reduce
209-
// fragmentation over time.
210-
let compact_num_block_hint = std::cmp::min(
211-
imperfect_count,
212-
(auto_compaction_imperfect_blocks_threshold as f64 * 1.5).ceil() as u64,
213-
);
214-
info!("set compact_num_block_hint to {compact_num_block_hint }");
215-
self.ctx
216-
.set_compaction_num_block_hint(table_info.name.as_str(), compact_num_block_hint);
217-
}
218-
219197
// merge statistics will set the additional_stats_meta to none,
220198
// so reset additional_stats_meta here.
221199
let table_statistics_location = table_stats_gen.table_statistics_location();

src/query/storages/fuse/src/operations/common/generators/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,6 @@ pub use conflict_resolve_context::SnapshotChanges;
2424
pub use conflict_resolve_context::SnapshotMerged;
2525
pub use mutation_generator::MutationGenerator;
2626
pub use snapshot_generator::decorate_snapshot;
27+
pub(crate) use snapshot_generator::set_compaction_num_block_hint;
2728
pub use snapshot_generator::SnapshotGenerator;
2829
pub use truncate_generator::TruncateGenerator;

src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
use std::any::Any;
1616
use std::sync::Arc;
1717

18+
use databend_common_catalog::table_context::TableContext;
1819
use databend_common_exception::Result;
1920
use databend_common_expression::TableSchema;
2021
use databend_common_meta_app::schema::TableInfo;
2122
use databend_storages_common_session::TxnManagerRef;
23+
use databend_storages_common_table_meta::meta::Statistics;
2224
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
2325
use databend_storages_common_table_meta::meta::TableSnapshot;
26+
use log::info;
2427

2528
use crate::operations::common::ConflictResolveContext;
2629
use crate::statistics::TableStatsGenerator;
@@ -92,3 +95,29 @@ pub fn decorate_snapshot(
9295
}
9396
Ok(())
9497
}
98+
99+
pub(crate) fn set_compaction_num_block_hint(
100+
ctx: &dyn TableContext,
101+
table_name: &str,
102+
summary: &Statistics,
103+
) -> Result<()> {
104+
// check if need to auto compact
105+
// the algorithm is: if the number of imperfect blocks is greater than the threshold, then auto compact.
106+
// the threshold is set by the setting `auto_compaction_imperfect_blocks_threshold`, default is 25.
107+
let imperfect_count = summary.block_count - summary.perfect_block_count;
108+
let auto_compaction_imperfect_blocks_threshold = ctx
109+
.get_settings()
110+
.get_auto_compaction_imperfect_blocks_threshold()?;
111+
if imperfect_count >= auto_compaction_imperfect_blocks_threshold {
112+
// If imperfect_count is larger, SLIGHTLY increase the number of blocks
113+
// eligible for auto-compaction, this adjustment is intended to help reduce
114+
// fragmentation over time.
115+
let compact_num_block_hint = std::cmp::min(
116+
imperfect_count,
117+
(auto_compaction_imperfect_blocks_threshold as f64 * 1.5).ceil() as u64,
118+
);
119+
info!("set compact_num_block_hint to {compact_num_block_hint }");
120+
ctx.set_compaction_num_block_hint(table_name, compact_num_block_hint);
121+
}
122+
Ok(())
123+
}

src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use log::error;
4242
use log::info;
4343

4444
use crate::operations::set_backoff;
45+
use crate::operations::set_compaction_num_block_hint;
4546
use crate::operations::AppendGenerator;
4647
use crate::operations::CommitMeta;
4748
use crate::operations::SnapshotGenerator;
@@ -272,15 +273,21 @@ async fn build_update_table_meta_req(
272273
let table_stats_gen = fuse_table
273274
.generate_table_stats(&previous, insert_hll, insert_rows)
274275
.await?;
276+
let table_info = table.get_table_info();
275277
let snapshot = snapshot_generator.generate_new_snapshot(
276-
table.get_table_info(),
278+
table_info,
277279
fuse_table.cluster_key_id(),
278280
previous,
279281
txn_mgr,
280282
table_meta_timestamps,
281283
table_stats_gen,
282284
)?;
283285
snapshot.ensure_segments_unique()?;
286+
let _ = set_compaction_num_block_hint(
287+
snapshot_generator.ctx.as_ref(),
288+
table_info.name.as_str(),
289+
&snapshot.summary,
290+
);
284291

285292
// write snapshot
286293
let dal = fuse_table.get_operator();

src/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use opendal::Operator;
5555

5656
use crate::io::TableMetaLocationGenerator;
5757
use crate::operations::set_backoff;
58+
use crate::operations::set_compaction_num_block_hint;
5859
use crate::operations::vacuum::vacuum_table;
5960
use crate::operations::AppendGenerator;
6061
use crate::operations::CommitMeta;
@@ -439,6 +440,11 @@ where F: SnapshotGenerator + Send + Sync + 'static
439440
table_stats_gen,
440441
) {
441442
Ok(snapshot) => {
443+
let _ = set_compaction_num_block_hint(
444+
self.ctx.as_ref(),
445+
table_info.name.as_str(),
446+
&snapshot.summary,
447+
);
442448
self.state = State::TryCommit {
443449
data: snapshot.to_bytes()?,
444450
snapshot,

0 commit comments

Comments
 (0)