Skip to content

Commit 0b1d01a

Browse files
authored
chore(query): continue the vacuum query hook even aborted (#17497)
1 parent f448778 commit 0b1d01a

File tree

3 files changed

+7
-5
lines changed

3 files changed

+7
-5
lines changed

src/query/ee/src/storages/fuse/operations/vacuum_temporary_files.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,10 @@ async fn vacuum_by_duration(
173173
Ok(removed_total)
174174
}
175175

176+
// Vacuum temporary files by query hook
177+
// If query was killed, we still need to clean up the temporary files
176178
async fn vacuum_query_hook(
177-
abort_checker: AbortChecker,
179+
_abort_checker: AbortChecker,
178180
temporary_dir: &str,
179181
nodes: &[usize],
180182
query_id: &str,
@@ -198,7 +200,6 @@ async fn vacuum_query_hook(
198200
.filter_map(|x| x.is_ok().then(|| x.unwrap()));
199201

200202
for (meta_file_path, buffer) in metas {
201-
abort_checker.try_check_aborting()?;
202203
let removed = vacuum_by_meta_buffer(
203204
&meta_file_path,
204205
temporary_dir,

src/query/service/src/interpreters/hook/vacuum_hook.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,12 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
4545

4646
let cluster = query_ctx.get_cluster();
4747
let query_id = query_ctx.get_id();
48-
48+
let abort_checker = query_ctx.clone().get_abort_checker();
4949
let mut node_files = HashMap::new();
5050
for node in cluster.nodes.iter() {
5151
let stats = query_ctx.get_spill_file_stats(Some(node.id.clone()));
52-
if stats.file_nums != 0 {
52+
// if query was aborted, we can't trust the stats
53+
if stats.file_nums != 0 || abort_checker.try_check_aborting().is_err() {
5354
if let Some(index) = cluster.index_of_nodeid(&node.id) {
5455
node_files.insert(index, stats.file_nums);
5556
}
@@ -65,7 +66,6 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
6566
);
6667

6768
let nodes = node_files.keys().cloned().collect::<Vec<usize>>();
68-
let abort_checker = query_ctx.clone().get_abort_checker();
6969
let _ = GlobalIORuntime::instance().block_on::<(), ErrorCode, _>(async move {
7070
let removed_files = handler
7171
.do_vacuum_temporary_files(

src/query/service/src/sessions/query_ctx.rs

+1
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ impl QueryContext {
401401
pub fn get_spill_file_stats(&self, node_id: Option<String>) -> SpillProgress {
402402
let r = self.shared.cluster_spill_progress.read();
403403
let node_id = node_id.unwrap_or(self.get_cluster().local_id());
404+
404405
r.get(&node_id).cloned().unwrap_or(SpillProgress::default())
405406
}
406407

0 commit comments

Comments
 (0)