Skip to content

Commit 529f896

Browse files
authored
fix(query): support vacuum temp files for create and refresh (#17421)
1 parent 0a69069 commit 529f896

File tree

5 files changed

+70
-9
lines changed

5 files changed

+70
-9
lines changed

src/query/service/src/interpreters/hook/compact_hook.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use databend_common_catalog::lock::LockTableOption;
2020
use databend_common_catalog::table::CompactionLimits;
2121
use databend_common_catalog::table_context::TableContext;
2222
use databend_common_exception::Result;
23+
use databend_common_pipeline_core::always_callback;
2324
use databend_common_pipeline_core::ExecutionInfo;
2425
use databend_common_pipeline_core::Pipeline;
2526
use databend_common_sql::executor::physical_plans::MutationKind;
@@ -32,6 +33,9 @@ use log::info;
3233

3334
use crate::interpreters::common::metrics_inc_compact_hook_compact_time_ms;
3435
use crate::interpreters::common::metrics_inc_compact_hook_main_operation_time_ms;
36+
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
37+
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
38+
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
3539
use crate::interpreters::Interpreter;
3640
use crate::interpreters::OptimizeCompactBlockInterpreter;
3741
use crate::interpreters::ReclusterTableInterpreter;
@@ -176,6 +180,16 @@ async fn compact_table(
176180
let mut build_res = compact_interpreter.execute2().await?;
177181
// execute the compact pipeline
178182
if build_res.main_pipeline.is_complete_pipeline()? {
183+
let query_ctx = ctx.clone();
184+
build_res.main_pipeline.set_on_finished(always_callback(
185+
move |_info: &ExecutionInfo| {
186+
hook_clear_m_cte_temp_table(&query_ctx)?;
187+
hook_vacuum_temp_files(&query_ctx)?;
188+
hook_disk_temp_dir(&query_ctx)?;
189+
Ok(())
190+
},
191+
));
192+
179193
build_res.set_max_threads(settings.get_max_threads()? as usize);
180194
let executor_settings = ExecutorSettings::try_create(ctx.clone())?;
181195

src/query/service/src/interpreters/hook/refresh_hook.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use databend_common_meta_app::schema::IndexMeta;
2323
use databend_common_meta_app::schema::ListIndexesByIdReq;
2424
use databend_common_meta_app::schema::ListVirtualColumnsReq;
2525
use databend_common_meta_types::MetaId;
26+
use databend_common_pipeline_core::always_callback;
2627
use databend_common_pipeline_core::ExecutionInfo;
2728
use databend_common_pipeline_core::Pipeline;
2829
use databend_common_sql::plans::Plan;
@@ -37,6 +38,9 @@ use databend_storages_common_table_meta::meta::Location;
3738
use log::info;
3839
use parking_lot::RwLock;
3940

41+
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
42+
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
43+
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
4044
use crate::interpreters::Interpreter;
4145
use crate::interpreters::RefreshIndexInterpreter;
4246
use crate::interpreters::RefreshTableIndexInterpreter;
@@ -130,6 +134,16 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
130134
let settings = ExecutorSettings::try_create(ctx_cloned.clone())?;
131135

132136
if build_res.main_pipeline.is_complete_pipeline()? {
137+
let query_ctx = ctx_cloned.clone();
138+
build_res.main_pipeline.set_on_finished(always_callback(
139+
move |_: &ExecutionInfo| {
140+
hook_clear_m_cte_temp_table(&query_ctx)?;
141+
hook_vacuum_temp_files(&query_ctx)?;
142+
hook_disk_temp_dir(&query_ctx)?;
143+
Ok(())
144+
},
145+
));
146+
133147
let mut pipelines = build_res.sources_pipelines;
134148
pipelines.push(build_res.main_pipeline);
135149

@@ -157,6 +171,16 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
157171
let settings = ExecutorSettings::try_create(ctx_cloned.clone())?;
158172

159173
if build_res.main_pipeline.is_complete_pipeline()? {
174+
let query_ctx = ctx_cloned.clone();
175+
build_res.main_pipeline.set_on_finished(always_callback(
176+
move |_info: &ExecutionInfo| {
177+
hook_clear_m_cte_temp_table(&query_ctx)?;
178+
hook_vacuum_temp_files(&query_ctx)?;
179+
hook_disk_temp_dir(&query_ctx)?;
180+
Ok(())
181+
},
182+
));
183+
160184
let mut pipelines = build_res.sources_pipelines;
161185
pipelines.push(build_res.main_pipeline);
162186

@@ -184,6 +208,16 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
184208
let settings = ExecutorSettings::try_create(ctx_cloned.clone())?;
185209

186210
if build_res.main_pipeline.is_complete_pipeline()? {
211+
let query_ctx = ctx_cloned.clone();
212+
build_res.main_pipeline.set_on_finished(always_callback(
213+
move |_: &ExecutionInfo| {
214+
hook_clear_m_cte_temp_table(&query_ctx)?;
215+
hook_vacuum_temp_files(&query_ctx)?;
216+
hook_disk_temp_dir(&query_ctx)?;
217+
Ok(())
218+
},
219+
));
220+
187221
let mut pipelines = build_res.sources_pipelines;
188222
pipelines.push(build_res.main_pipeline);
189223

src/query/service/src/interpreters/hook/vacuum_hook.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,11 @@ pub fn hook_disk_temp_dir(query_ctx: &Arc<QueryContext>) -> Result<()> {
102102

103103
Ok(())
104104
}
105+
106+
pub fn hook_clear_m_cte_temp_table(query_ctx: &Arc<QueryContext>) -> Result<()> {
107+
let _ = GlobalIORuntime::instance().block_on(async move {
108+
query_ctx.drop_m_cte_temp_table().await?;
109+
Ok(())
110+
});
111+
Ok(())
112+
}

src/query/service/src/interpreters/interpreter.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use databend_common_base::base::short_sql;
2727
use databend_common_base::runtime::profile::get_statistics_desc;
2828
use databend_common_base::runtime::profile::ProfileDesc;
2929
use databend_common_base::runtime::profile::ProfileStatisticsName;
30-
use databend_common_base::runtime::GlobalIORuntime;
3130
use databend_common_catalog::query_kind::QueryKind;
3231
use databend_common_catalog::table_context::TableContext;
3332
use databend_common_exception::ErrorCode;
@@ -50,6 +49,7 @@ use log::info;
5049
use md5::Digest;
5150
use md5::Md5;
5251

52+
use super::hook::vacuum_hook::hook_clear_m_cte_temp_table;
5353
use super::hook::vacuum_hook::hook_disk_temp_dir;
5454
use super::hook::vacuum_hook::hook_vacuum_temp_files;
5555
use super::InterpreterMetrics;
@@ -360,11 +360,3 @@ fn need_acquire_lock(ctx: Arc<QueryContext>, stmt: &Statement) -> bool {
360360
_ => false,
361361
}
362362
}
363-
364-
fn hook_clear_m_cte_temp_table(query_ctx: &Arc<QueryContext>) -> Result<()> {
365-
let _ = GlobalIORuntime::instance().block_on(async move {
366-
query_ctx.drop_m_cte_temp_table().await?;
367-
Ok(())
368-
});
369-
Ok(())
370-
}

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use databend_common_meta_app::schema::TableMeta;
3737
use databend_common_meta_app::schema::TableNameIdent;
3838
use databend_common_meta_app::schema::TableStatistics;
3939
use databend_common_meta_types::MatchSeq;
40+
use databend_common_pipeline_core::always_callback;
4041
use databend_common_pipeline_core::ExecutionInfo;
4142
use databend_common_sql::field_default_value;
4243
use databend_common_sql::plans::CreateTablePlan;
@@ -63,6 +64,9 @@ use crate::interpreters::common::table_option_validation::is_valid_create_opt;
6364
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
6465
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
6566
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
67+
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
68+
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
69+
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
6670
use crate::interpreters::InsertInterpreter;
6771
use crate::interpreters::Interpreter;
6872
use crate::pipelines::PipelineBuildResult;
@@ -242,6 +246,15 @@ impl CreateTableInterpreter {
242246
let db_name = self.plan.database.clone();
243247
let table_name = self.plan.table.clone();
244248

249+
let query_ctx = self.ctx.clone();
250+
pipeline
251+
.main_pipeline
252+
.set_on_finished(always_callback(move |_: &ExecutionInfo| {
253+
hook_clear_m_cte_temp_table(&query_ctx)?;
254+
hook_vacuum_temp_files(&query_ctx)?;
255+
hook_disk_temp_dir(&query_ctx)?;
256+
Ok(())
257+
}));
245258
// Add a callback to restore table visibility upon successful insert pipeline completion.
246259
// As there might be previous on_finish callbacks(e.g. refresh/compact/re-cluster hooks) which
247260
// depend on the table being visible, this callback is added at the beginning of the on_finish

0 commit comments

Comments
 (0)