@@ -78,46 +78,50 @@ async fn do_hook_compact(
78
78
}
79
79
80
80
pipeline. set_on_finished ( move |info : & ExecutionInfo | {
81
- let compaction_limits = match compact_target. mutation_kind {
82
- MutationKind :: Insert => {
83
- let compaction_num_block_hint = ctx. get_compaction_num_block_hint ( & compact_target. table ) ;
84
- info ! ( "table {} hint number of blocks need to be compacted {}" , compact_target. table, compaction_num_block_hint) ;
85
- if compaction_num_block_hint == 0 {
86
- return Ok ( ( ) ) ;
87
- }
88
- CompactionLimits {
89
- segment_limit : None ,
90
- block_limit : Some ( compaction_num_block_hint as usize ) ,
81
+ if info. res . is_ok ( ) {
82
+ let op_name = & trace_ctx. operation_name ;
83
+ metrics_inc_compact_hook_main_operation_time_ms ( op_name, trace_ctx. start . elapsed ( ) . as_millis ( ) as u64 ) ;
84
+ info ! ( "execute {op_name} finished successfully. running table optimization job." ) ;
85
+
86
+ let compact_start_at = Instant :: now ( ) ;
87
+ let compaction_limits = match compact_target. mutation_kind {
88
+ MutationKind :: Insert => {
89
+ let compaction_num_block_hint = ctx. get_compaction_num_block_hint ( & compact_target. table ) ;
90
+ info ! ( "table {} hint number of blocks need to be compacted {}" , compact_target. table, compaction_num_block_hint) ;
91
+ if compaction_num_block_hint == 0 {
92
+ return Ok ( ( ) ) ;
93
+ }
94
+ CompactionLimits {
95
+ segment_limit : None ,
96
+ block_limit : Some ( compaction_num_block_hint as usize ) ,
97
+ }
91
98
}
92
- }
93
- _ =>
94
- // for mutations other than Insertions, we use an empirical value of 3 segments as the
95
- // limit for compaction. to be refined later.
96
- {
99
+ _ => {
97
100
let auto_compaction_segments_limit = ctx. get_settings ( ) . get_auto_compaction_segments_limit ( ) ?;
98
101
CompactionLimits {
99
102
segment_limit : Some ( auto_compaction_segments_limit as usize ) ,
100
103
block_limit : None ,
101
104
}
102
105
}
103
- } ;
106
+ } ;
104
107
105
- let op_name = & trace_ctx. operation_name ;
106
- metrics_inc_compact_hook_main_operation_time_ms ( op_name, trace_ctx. start . elapsed ( ) . as_millis ( ) as u64 ) ;
108
+ // keep the original progress value
109
+ let progress = ctx. get_write_progress ( ) ;
110
+ let progress_value = progress. as_ref ( ) . get_values ( ) ;
107
111
108
- let compact_start_at = Instant :: now ( ) ;
109
- if info. res . is_ok ( ) {
110
- info ! ( "execute {op_name} finished successfully. running table optimization job." ) ;
111
112
match GlobalIORuntime :: instance ( ) . block_on ( {
112
113
compact_table ( ctx, compact_target, compaction_limits, lock_opt)
113
114
} ) {
114
115
Ok ( _) => {
115
116
info ! ( "execute {op_name} finished successfully. table optimization job finished." ) ;
116
117
}
117
- Err ( e) => { info ! ( "execute {op_name} finished successfully. table optimization job failed. {:?}" , e) }
118
+ Err ( e) => { info ! ( "execute {op_name} finished successfully. table optimization job failed. {:?}" , e) ; }
118
119
}
120
+
121
+ // reset the progress value
122
+ progress. set ( & progress_value) ;
123
+ metrics_inc_compact_hook_compact_time_ms ( & trace_ctx. operation_name , compact_start_at. elapsed ( ) . as_millis ( ) as u64 ) ;
119
124
}
120
- metrics_inc_compact_hook_compact_time_ms ( & trace_ctx. operation_name , compact_start_at. elapsed ( ) . as_millis ( ) as u64 ) ;
121
125
122
126
Ok ( ( ) )
123
127
} ) ;
@@ -141,7 +145,7 @@ async fn compact_table(
141
145
& compact_target. table ,
142
146
)
143
147
. await ?;
144
- let do_recluster = !table . cluster_keys ( ctx. clone ( ) ) . is_empty ( ) ;
148
+ let settings = ctx. get_settings ( ) ;
145
149
146
150
// evict the table from cache
147
151
ctx. evict_table_from_cache (
@@ -150,56 +154,60 @@ async fn compact_table(
150
154
& compact_target. table ,
151
155
) ?;
152
156
153
- let mut build_res = if do_recluster {
154
- let recluster = RelOperator :: Recluster ( Recluster {
155
- catalog : compact_target. catalog ,
156
- database : compact_target. database ,
157
- table : compact_target. table ,
158
- filters : None ,
159
- limit : compaction_limits. segment_limit ,
160
- } ) ;
161
- let s_expr = SExpr :: create_leaf ( Arc :: new ( recluster) ) ;
162
- let recluster_interpreter =
163
- ReclusterTableInterpreter :: try_create ( ctx. clone ( ) , s_expr, lock_opt, false ) ?;
164
- recluster_interpreter. execute2 ( ) . await ?
165
- } else {
157
+ {
158
+ // do compact.
166
159
let compact_block = RelOperator :: CompactBlock ( OptimizeCompactBlock {
167
- catalog : compact_target. catalog ,
168
- database : compact_target. database ,
169
- table : compact_target. table ,
170
- limit : compaction_limits,
160
+ catalog : compact_target. catalog . clone ( ) ,
161
+ database : compact_target. database . clone ( ) ,
162
+ table : compact_target. table . clone ( ) ,
163
+ limit : compaction_limits. clone ( ) ,
171
164
} ) ;
172
165
let s_expr = SExpr :: create_leaf ( Arc :: new ( compact_block) ) ;
173
- let compact_interpreter =
174
- OptimizeCompactBlockInterpreter :: try_create ( ctx. clone ( ) , s_expr, lock_opt, false ) ?;
175
- compact_interpreter. execute2 ( ) . await ?
176
- } ;
177
-
178
- if build_res. main_pipeline . is_empty ( ) {
179
- return Ok ( ( ) ) ;
166
+ let compact_interpreter = OptimizeCompactBlockInterpreter :: try_create (
167
+ ctx. clone ( ) ,
168
+ s_expr,
169
+ lock_opt. clone ( ) ,
170
+ false ,
171
+ ) ?;
172
+ let mut build_res = compact_interpreter. execute2 ( ) . await ?;
173
+ // execute the compact pipeline
174
+ if build_res. main_pipeline . is_complete_pipeline ( ) ? {
175
+ build_res. set_max_threads ( settings. get_max_threads ( ) ? as usize ) ;
176
+ let executor_settings = ExecutorSettings :: try_create ( ctx. clone ( ) ) ?;
177
+
178
+ let mut pipelines = build_res. sources_pipelines ;
179
+ pipelines. push ( build_res. main_pipeline ) ;
180
+
181
+ let complete_executor =
182
+ PipelineCompleteExecutor :: from_pipelines ( pipelines, executor_settings) ?;
183
+
184
+ // Clears previously generated segment locations to avoid duplicate data in the refresh phase
185
+ ctx. clear_segment_locations ( ) ?;
186
+ ctx. set_executor ( complete_executor. get_inner ( ) ) ?;
187
+ complete_executor. execute ( ) ?;
188
+ drop ( complete_executor) ;
189
+ }
180
190
}
181
191
182
- // execute the compact pipeline (for table with cluster keys, re-cluster will also be executed)
183
- let settings = ctx. get_settings ( ) ;
184
- build_res. set_max_threads ( settings. get_max_threads ( ) ? as usize ) ;
185
- let settings = ExecutorSettings :: try_create ( ctx. clone ( ) ) ?;
186
-
187
- if build_res. main_pipeline . is_complete_pipeline ( ) ? {
188
- let mut pipelines = build_res. sources_pipelines ;
189
- pipelines. push ( build_res. main_pipeline ) ;
190
-
191
- let complete_executor = PipelineCompleteExecutor :: from_pipelines ( pipelines, settings) ?;
192
-
193
- // keep the original progress value
194
- let progress_value = ctx. get_write_progress_value ( ) ;
195
- // Clears previously generated segment locations to avoid duplicate data in the refresh phase
196
- ctx. clear_segment_locations ( ) ?;
197
- ctx. set_executor ( complete_executor. get_inner ( ) ) ?;
198
- complete_executor. execute ( ) ?;
199
- drop ( complete_executor) ;
200
-
201
- // reset the progress value
202
- ctx. get_write_progress ( ) . set ( & progress_value) ;
192
+ {
193
+ // do recluster.
194
+ if !table. cluster_keys ( ctx. clone ( ) ) . is_empty ( ) {
195
+ let recluster = RelOperator :: Recluster ( Recluster {
196
+ catalog : compact_target. catalog ,
197
+ database : compact_target. database ,
198
+ table : compact_target. table ,
199
+ filters : None ,
200
+ limit : Some ( settings. get_auto_compaction_segments_limit ( ) ? as usize ) ,
201
+ } ) ;
202
+ let s_expr = SExpr :: create_leaf ( Arc :: new ( recluster) ) ;
203
+ let recluster_interpreter =
204
+ ReclusterTableInterpreter :: try_create ( ctx. clone ( ) , s_expr, lock_opt, false ) ?;
205
+ // Recluster will be done in `ReclusterTableInterpreter::execute2` directly,
206
+ // we do not need to use `PipelineCompleteExecutor` to execute it.
207
+ let build_res = recluster_interpreter. execute2 ( ) . await ?;
208
+ assert ! ( build_res. main_pipeline. is_empty( ) ) ;
209
+ }
203
210
}
211
+
204
212
Ok ( ( ) )
205
213
}
0 commit comments