Skip to content

Commit 37ef891

Browse files
committed
chore: Merge the logic that triggers automatic compression upon write
1 parent 361c0ab commit 37ef891

File tree

8 files changed

+59
-53
lines changed

8 files changed

+59
-53
lines changed

src/query/service/src/interpreters/hook/compact_hook.rs

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use databend_common_exception::Result;
2626
use databend_common_pipeline::core::always_callback;
2727
use databend_common_pipeline::core::ExecutionInfo;
2828
use databend_common_pipeline::core::Pipeline;
29-
use databend_common_sql::executor::physical_plans::MutationKind;
3029
use databend_common_sql::optimizer::ir::SExpr;
3130
use databend_common_sql::plans::OptimizeCompactBlock;
3231
use databend_common_sql::plans::ReclusterPlan;
@@ -50,7 +49,6 @@ pub struct CompactTargetTableDescription {
5049
pub catalog: String,
5150
pub database: String,
5251
pub table: String,
53-
pub mutation_kind: MutationKind,
5452
}
5553

5654
pub struct CompactHookTraceCtx {
@@ -95,30 +93,18 @@ async fn do_hook_compact(
9593
info!("Operation {op_name} completed successfully, starting table optimization job.");
9694

9795
let compact_start_at = Instant::now();
98-
let compaction_limits = match compact_target.mutation_kind {
99-
MutationKind::Insert => {
100-
let compaction_num_block_hint =
101-
ctx.get_compaction_num_block_hint(&compact_target.table);
102-
info!(
103-
"Table {} requires compaction of {} blocks",
104-
compact_target.table, compaction_num_block_hint
105-
);
106-
if compaction_num_block_hint == 0 {
107-
return Ok(());
108-
}
109-
CompactionLimits {
110-
segment_limit: None,
111-
block_limit: Some(compaction_num_block_hint as usize),
112-
}
113-
}
114-
_ => {
115-
let auto_compaction_segments_limit =
116-
ctx.get_settings().get_auto_compaction_segments_limit()?;
117-
CompactionLimits {
118-
segment_limit: Some(auto_compaction_segments_limit as usize),
119-
block_limit: None,
120-
}
121-
}
96+
let compaction_num_block_hint =
97+
ctx.get_compaction_num_block_hint(&compact_target.table);
98+
info!(
99+
"Table {} requires compaction of {} blocks",
100+
compact_target.table, compaction_num_block_hint
101+
);
102+
if compaction_num_block_hint == 0 {
103+
return Ok(());
104+
}
105+
let compaction_limits = CompactionLimits {
106+
segment_limit: None,
107+
block_limit: Some(compaction_num_block_hint as usize),
122108
};
123109

124110
// keep the original progress value

src/query/service/src/interpreters/hook/hook.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ impl HookOperator {
102102
catalog: self.catalog.to_owned(),
103103
database: self.database.to_owned(),
104104
table: self.table.to_owned(),
105-
mutation_kind: self.mutation_kind,
106105
};
107106

108107
let trace_ctx = CompactHookTraceCtx {

src/query/settings/src/settings_default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ impl DefaultSettings {
892892
}),
893893
("auto_compaction_segments_limit", DefaultSettingValue {
894894
value: UserSettingValue::UInt64(3),
895-
desc: "The maximum number of segments that can be compacted automatically triggered after write(replace-into/merge-into).",
895+
desc: "The maximum number of segments that can be reclustered automatically triggered after write.",
896896
mode: SettingMode::Both,
897897
scope: SettingScope::Both,
898898
range: Some(SettingRange::Numeric(2..=u64::MAX)),

src/query/storages/fuse/src/operations/common/generators/append_generator.rs

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use databend_storages_common_table_meta::meta::ColumnStatistics;
3030
use databend_storages_common_table_meta::meta::Statistics;
3131
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
3232
use databend_storages_common_table_meta::meta::TableSnapshot;
33-
use log::info;
3433
use log::warn;
3534

3635
use crate::operations::common::ConflictResolveContext;
@@ -41,7 +40,8 @@ use crate::statistics::TableStatsGenerator;
4140

4241
#[derive(Clone)]
4342
pub struct AppendGenerator {
44-
ctx: Arc<dyn TableContext>,
43+
pub(crate) ctx: Arc<dyn TableContext>,
44+
4545
leaf_default_values: HashMap<ColumnId, Scalar>,
4646
overwrite: bool,
4747
conflict_resolve_ctx: ConflictResolveContext,
@@ -194,28 +194,6 @@ impl SnapshotGenerator for AppendGenerator {
194194
}
195195
}
196196

197-
// check if need to auto compact
198-
// the algorithm is: if the number of imperfect blocks is greater than the threshold, then auto compact.
199-
// the threshold is set by the setting `auto_compaction_imperfect_blocks_threshold`, default is 25.
200-
let imperfect_count = new_summary.block_count - new_summary.perfect_block_count;
201-
let auto_compaction_imperfect_blocks_threshold = self
202-
.ctx
203-
.get_settings()
204-
.get_auto_compaction_imperfect_blocks_threshold()?;
205-
206-
if imperfect_count >= auto_compaction_imperfect_blocks_threshold {
207-
// If imperfect_count is larger, SLIGHTLY increase the number of blocks
208-
// eligible for auto-compaction, this adjustment is intended to help reduce
209-
// fragmentation over time.
210-
let compact_num_block_hint = std::cmp::min(
211-
imperfect_count,
212-
(auto_compaction_imperfect_blocks_threshold as f64 * 1.5).ceil() as u64,
213-
);
214-
info!("set compact_num_block_hint to {compact_num_block_hint }");
215-
self.ctx
216-
.set_compaction_num_block_hint(table_info.name.as_str(), compact_num_block_hint);
217-
}
218-
219197
// merge statistics will set the additional_stats_meta to none,
220198
// so reset additional_stats_meta here.
221199
let table_statistics_location = table_stats_gen.table_statistics_location();

src/query/storages/fuse/src/operations/common/generators/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,6 @@ pub use conflict_resolve_context::SnapshotChanges;
2424
pub use conflict_resolve_context::SnapshotMerged;
2525
pub use mutation_generator::MutationGenerator;
2626
pub use snapshot_generator::decorate_snapshot;
27+
pub(crate) use snapshot_generator::set_compaction_num_block_hint;
2728
pub use snapshot_generator::SnapshotGenerator;
2829
pub use truncate_generator::TruncateGenerator;

src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
use std::any::Any;
1616
use std::sync::Arc;
1717

18+
use databend_common_catalog::table_context::TableContext;
1819
use databend_common_exception::Result;
1920
use databend_common_expression::TableSchema;
2021
use databend_common_meta_app::schema::TableInfo;
2122
use databend_storages_common_session::TxnManagerRef;
23+
use databend_storages_common_table_meta::meta::Statistics;
2224
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
2325
use databend_storages_common_table_meta::meta::TableSnapshot;
26+
use log::info;
2427

2528
use crate::operations::common::ConflictResolveContext;
2629
use crate::statistics::TableStatsGenerator;
@@ -92,3 +95,29 @@ pub fn decorate_snapshot(
9295
}
9396
Ok(())
9497
}
98+
99+
pub(crate) fn set_compaction_num_block_hint(
100+
ctx: &dyn TableContext,
101+
table_name: &str,
102+
summary: &Statistics,
103+
) -> Result<()> {
104+
// check if need to auto compact
105+
// the algorithm is: if the number of imperfect blocks is greater than the threshold, then auto compact.
106+
// the threshold is set by the setting `auto_compaction_imperfect_blocks_threshold`, default is 25.
107+
let imperfect_count = summary.block_count - summary.perfect_block_count;
108+
let auto_compaction_imperfect_blocks_threshold = ctx
109+
.get_settings()
110+
.get_auto_compaction_imperfect_blocks_threshold()?;
111+
if imperfect_count >= auto_compaction_imperfect_blocks_threshold {
112+
// If imperfect_count is larger, SLIGHTLY increase the number of blocks
113+
// eligible for auto-compaction, this adjustment is intended to help reduce
114+
// fragmentation over time.
115+
let compact_num_block_hint = std::cmp::min(
116+
imperfect_count,
117+
(auto_compaction_imperfect_blocks_threshold as f64 * 1.5).ceil() as u64,
118+
);
119+
info!("set compact_num_block_hint to {compact_num_block_hint }");
120+
ctx.set_compaction_num_block_hint(table_name, compact_num_block_hint);
121+
}
122+
Ok(())
123+
}

src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use log::error;
4242
use log::info;
4343

4444
use crate::operations::set_backoff;
45+
use crate::operations::set_compaction_num_block_hint;
4546
use crate::operations::AppendGenerator;
4647
use crate::operations::CommitMeta;
4748
use crate::operations::SnapshotGenerator;
@@ -272,15 +273,21 @@ async fn build_update_table_meta_req(
272273
let table_stats_gen = fuse_table
273274
.generate_table_stats(&previous, insert_hll, insert_rows)
274275
.await?;
276+
let table_info = table.get_table_info();
275277
let snapshot = snapshot_generator.generate_new_snapshot(
276-
table.get_table_info(),
278+
table_info,
277279
fuse_table.cluster_key_id(),
278280
previous,
279281
txn_mgr,
280282
table_meta_timestamps,
281283
table_stats_gen,
282284
)?;
283285
snapshot.ensure_segments_unique()?;
286+
let _ = set_compaction_num_block_hint(
287+
snapshot_generator.ctx.as_ref(),
288+
table_info.name.as_str(),
289+
&snapshot.summary,
290+
);
284291

285292
// write snapshot
286293
let dal = fuse_table.get_operator();

src/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use opendal::Operator;
5858

5959
use crate::io::TableMetaLocationGenerator;
6060
use crate::operations::set_backoff;
61+
use crate::operations::set_compaction_num_block_hint;
6162
use crate::operations::vacuum::vacuum_table;
6263
use crate::operations::AppendGenerator;
6364
use crate::operations::CommitMeta;
@@ -442,6 +443,11 @@ where F: SnapshotGenerator + Send + Sync + 'static
442443
table_stats_gen,
443444
) {
444445
Ok(snapshot) => {
446+
let _ = set_compaction_num_block_hint(
447+
self.ctx.as_ref(),
448+
table_info.name.as_str(),
449+
&snapshot.summary,
450+
);
445451
self.state = State::TryCommit {
446452
data: snapshot.to_bytes()?,
447453
snapshot,

0 commit comments

Comments
 (0)