Skip to content

Commit 61cbca2

Browse files
committed
upd
1 parent 01fd754 commit 61cbca2

8 files changed

Lines changed: 80 additions & 74 deletions

be/src/exec/operator/distinct_streaming_aggregation_operator.cpp

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -153,18 +153,14 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
153153

154154
size_t key_size = _probe_expr_ctxs.size();
155155
ColumnRawPtrs key_columns(key_size);
156-
std::vector<int> result_idxs(key_size);
156+
Columns key_column_ptrs(key_size);
157157
{
158158
SCOPED_TIMER(_expr_timer);
159159
for (size_t i = 0; i < key_size; ++i) {
160-
int result_column_id = -1;
161-
RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, &result_column_id));
162-
in_block->get_by_position(result_column_id).column =
163-
in_block->get_by_position(result_column_id)
164-
.column->convert_to_full_column_if_const();
165-
key_columns[i] = in_block->get_by_position(result_column_id).column.get();
160+
RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, key_column_ptrs[i]));
161+
key_column_ptrs[i] = key_column_ptrs[i]->convert_to_full_column_if_const();
162+
key_columns[i] = key_column_ptrs[i].get();
166163
key_columns[i]->assume_mutable()->replace_float_special_values();
167-
result_idxs[i] = result_column_id;
168164
}
169165
}
170166

@@ -198,12 +194,9 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
198194
}
199195
DCHECK_EQ(out_block->columns(), key_size);
200196
if (_stop_emplace_flag && _distinct_row.empty()) {
201-
// If _stop_emplace_flag is true and _distinct_row is also empty, it means it is in streaming mode, outputting what is input
202-
// swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1)
197+
// Streaming mode: move key columns directly into output block
203198
for (int i = 0; i < key_size; ++i) {
204-
auto output_column = out_block->get_by_position(i).column;
205-
out_block->replace_by_position(i, key_columns[i]->assume_mutable());
206-
in_block->replace_by_position(result_idxs[i], output_column);
199+
out_block->replace_by_position(i, std::move(key_column_ptrs[i]));
207200
}
208201
} else {
209202
DCHECK_EQ(_cache_block.rows(), 0);
@@ -231,7 +224,7 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
231224
ColumnsWithTypeAndName columns_with_schema;
232225
for (int i = 0; i < key_size; ++i) {
233226
if (_stop_emplace_flag) {
234-
columns_with_schema.emplace_back(key_columns[i]->assume_mutable(),
227+
columns_with_schema.emplace_back(std::move(key_column_ptrs[i]),
235228
_probe_expr_ctxs[i]->root()->data_type(),
236229
_probe_expr_ctxs[i]->root()->expr_name());
237230
} else {

be/src/exec/operator/hashjoin_probe_operator.cpp

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,9 @@ Status HashJoinProbeLocalState::close(RuntimeState* state) {
135135
return JoinProbeLocalState<HashJoinSharedState, HashJoinProbeLocalState>::close(state);
136136
}
137137

138-
bool HashJoinProbeLocalState::_need_probe_null_map(Block& block,
139-
const std::vector<int>& res_col_ids) {
138+
bool HashJoinProbeLocalState::_need_probe_null_map(const std::vector<ColumnPtr>& res_columns) {
140139
for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
141-
const auto* column = block.get_by_position(res_col_ids[i]).column.get();
140+
const auto* column = res_columns[i].get();
142141
if (column->is_nullable() &&
143142
!_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) {
144143
return true;
@@ -329,7 +328,7 @@ std::string HashJoinProbeLocalState::debug_string(int indentation_level) const {
329328
}
330329

331330
Status HashJoinProbeLocalState::_extract_join_column(Block& block,
332-
const std::vector<int>& res_col_ids) {
331+
const std::vector<ColumnPtr>& res_columns) {
333332
if (empty_right_table_shortcut()) {
334333
return Status::OK();
335334
}
@@ -338,7 +337,7 @@ Status HashJoinProbeLocalState::_extract_join_column(Block& block,
338337

339338
if (!_has_set_need_null_map_for_probe) {
340339
_has_set_need_null_map_for_probe = true;
341-
_need_null_map_for_probe = _need_probe_null_map(block, res_col_ids);
340+
_need_null_map_for_probe = _need_probe_null_map(res_columns);
342341
}
343342
if (_need_null_map_for_probe) {
344343
if (!_null_map_column) {
@@ -349,11 +348,10 @@ Status HashJoinProbeLocalState::_extract_join_column(Block& block,
349348

350349
auto& shared_state = *_shared_state;
351350
for (size_t i = 0; i < shared_state.build_exprs_size; ++i) {
352-
const auto* column = block.get_by_position(res_col_ids[i]).column.get();
351+
const auto* column = res_columns[i].get();
353352
if (!column->is_nullable() &&
354353
_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) {
355-
_key_columns_holder.emplace_back(
356-
make_nullable(block.get_by_position(res_col_ids[i]).column));
354+
_key_columns_holder.emplace_back(make_nullable(res_columns[i]));
357355
_probe_columns[i] = _key_columns_holder.back().get();
358356
} else if (const auto* nullable = check_and_get_column<ColumnNullable>(*column);
359357
nullable &&
@@ -363,8 +361,11 @@ Status HashJoinProbeLocalState::_extract_join_column(Block& block,
363361
const auto& col_nullmap = nullable->get_null_map_data();
364362
DCHECK(_null_map_column);
365363
VectorizedUtils::update_null_map(_null_map_column->get_data(), col_nullmap);
364+
// Hold the column to keep nested column pointer alive
365+
_key_columns_holder.emplace_back(res_columns[i]);
366366
_probe_columns[i] = &col_nested;
367367
} else {
368+
_key_columns_holder.emplace_back(res_columns[i]);
368369
_probe_columns[i] = column;
369370
}
370371
}
@@ -411,19 +412,16 @@ bool HashJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const {
411412

412413
Status HashJoinProbeOperatorX::_do_evaluate(Block& block, VExprContextSPtrs& exprs,
413414
RuntimeProfile::Counter& expr_call_timer,
414-
std::vector<int>& res_col_ids) const {
415+
std::vector<ColumnPtr>& res_columns) const {
415416
for (size_t i = 0; i < exprs.size(); ++i) {
416-
int result_col_id = -1;
417-
// execute build column
417+
// execute probe column
418418
{
419419
SCOPED_TIMER(&expr_call_timer);
420-
RETURN_IF_ERROR(exprs[i]->execute(&block, &result_col_id));
420+
RETURN_IF_ERROR(exprs[i]->execute(&block, res_columns[i]));
421421
}
422422

423423
// TODO: opt the column is const
424-
block.get_by_position(result_col_id).column =
425-
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
426-
res_col_ids[i] = result_col_id;
424+
res_columns[i] = res_columns[i]->convert_to_full_column_if_const();
427425
}
428426
return Status::OK();
429427
}
@@ -438,15 +436,17 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, Block* input_block, boo
438436

439437
if (rows > 0) {
440438
COUNTER_UPDATE(local_state._probe_rows_counter, rows);
441-
std::vector<int> res_col_ids(local_state._probe_expr_ctxs.size());
439+
std::vector<ColumnPtr> res_columns(local_state._probe_expr_ctxs.size());
442440
RETURN_IF_ERROR(_do_evaluate(*input_block, local_state._probe_expr_ctxs,
443-
*local_state._probe_expr_call_timer, res_col_ids));
441+
*local_state._probe_expr_call_timer, res_columns));
444442
if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
445443
local_state._probe_column_convert_to_null =
446444
local_state._convert_block_to_null(*input_block);
447445
}
448446

449-
RETURN_IF_ERROR(local_state._extract_join_column(*input_block, res_col_ids));
447+
RETURN_IF_ERROR(local_state._extract_join_column(*input_block, res_columns));
448+
// Release ColumnPtrs to restore reference counts on block columns
449+
res_columns.clear();
450450

451451
local_state._estimate_memory_usage += (input_block->allocated_bytes() - origin_size);
452452

be/src/exec/operator/hashjoin_probe_operator.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ class HashJoinProbeLocalState MOCK_REMOVE(final)
6969

7070
private:
7171
void _prepare_probe_block();
72-
bool _need_probe_null_map(Block& block, const std::vector<int>& res_col_ids);
72+
bool _need_probe_null_map(const std::vector<ColumnPtr>& res_columns);
7373
std::vector<uint16_t> _convert_block_to_null(Block& block);
74-
Status _extract_join_column(Block& block, const std::vector<int>& res_col_ids);
74+
Status _extract_join_column(Block& block, const std::vector<ColumnPtr>& res_columns);
7575
friend class HashJoinProbeOperatorX;
7676
template <int JoinOpType>
7777
friend struct ProcessHashTableProbe;
@@ -182,7 +182,7 @@ class HashJoinProbeOperatorX MOCK_REMOVE(final)
182182
private:
183183
Status _do_evaluate(Block& block, VExprContextSPtrs& exprs,
184184
RuntimeProfile::Counter& expr_call_timer,
185-
std::vector<int>& res_col_ids) const;
185+
std::vector<ColumnPtr>& res_columns) const;
186186
friend class HashJoinProbeLocalState;
187187
template <int JoinOpType>
188188
friend struct ProcessHashTableProbe;

be/src/exec/operator/set_probe_sink_operator.cpp

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -142,15 +142,13 @@ Status SetProbeSinkOperatorX<is_intersect>::_extract_probe_column(
142142
auto& build_not_ignore_null = local_state._shared_state->build_not_ignore_null;
143143

144144
auto& child_exprs = local_state._child_exprs;
145+
local_state._probe_column_holders.resize(child_exprs.size());
145146
for (size_t i = 0; i < child_exprs.size(); ++i) {
146-
int result_col_id = -1;
147-
RETURN_IF_ERROR(child_exprs[i]->execute(&block, &result_col_id));
147+
ColumnPtr result_column;
148+
RETURN_IF_ERROR(child_exprs[i]->execute(&block, result_column));
149+
result_column = result_column->convert_to_full_column_if_const();
148150

149-
block.get_by_position(result_col_id).column =
150-
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
151-
const auto* column = block.get_by_position(result_col_id).column.get();
152-
153-
if (const auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
151+
if (const auto* nullable = check_and_get_column<ColumnNullable>(*result_column)) {
154152
if (!build_not_ignore_null[i]) {
155153
return Status::InternalError(
156154
"SET operator expects a nullable : {} column in column {}, but the "
@@ -160,16 +158,13 @@ Status SetProbeSinkOperatorX<is_intersect>::_extract_probe_column(
160158
nullable->get_nested_column_ptr()->is_nullable());
161159
}
162160
raw_ptrs[i] = nullable;
161+
local_state._probe_column_holders[i] = std::move(result_column);
163162
} else {
164163
if (build_not_ignore_null[i]) {
165-
auto column_ptr = make_nullable(block.get_by_position(result_col_id).column, false);
166-
local_state._probe_column_inserted_id.emplace_back(block.columns());
167-
block.insert(
168-
{column_ptr, make_nullable(block.get_by_position(result_col_id).type), ""});
169-
column = column_ptr.get();
164+
result_column = make_nullable(result_column, false);
170165
}
171-
172-
raw_ptrs[i] = column;
166+
raw_ptrs[i] = result_column.get();
167+
local_state._probe_column_holders[i] = std::move(result_column);
173168
}
174169
}
175170
return Status::OK();

be/src/exec/operator/set_probe_sink_operator.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ class SetProbeSinkLocalState final : public PipelineXSinkLocalState<SetSharedSta
5454

5555
int64_t _estimate_memory_usage = 0;
5656

57-
//record insert column id during probe
58-
std::vector<uint16_t> _probe_column_inserted_id;
57+
// Holds ColumnPtr references to keep _probe_columns raw pointers valid
58+
std::vector<ColumnPtr> _probe_column_holders;
5959
ColumnRawPtrs _probe_columns;
6060
// every child has its result expr list
6161
VExprContextSPtrs _child_exprs;

be/src/exec/scan/scanner.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,15 @@ Status Scanner::_do_projections(Block* origin_block, Block* output_block) {
189189
}
190190
Block input_block = *origin_block;
191191

192-
std::vector<int> result_column_ids;
193192
for (auto& projections : _intermediate_projections) {
194-
result_column_ids.resize(projections.size());
193+
ColumnsWithTypeAndName columns_with_schema;
194+
columns_with_schema.reserve(projections.size());
195195
for (int i = 0; i < projections.size(); i++) {
196-
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
196+
ColumnWithTypeAndName result_data;
197+
RETURN_IF_ERROR(projections[i]->execute(&input_block, result_data));
198+
columns_with_schema.emplace_back(std::move(result_data));
197199
}
198-
input_block.shuffle_columns(result_column_ids);
200+
input_block = Block(std::move(columns_with_schema));
199201
}
200202

201203
DCHECK_EQ(rows, input_block.rows());

be/src/exprs/vectorized_agg_fn.cpp

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
#include "common/object_pool.h"
3232
#include "core/block/block.h"
3333
#include "core/block/column_with_type_and_name.h"
34-
#include "core/block/materialize_block.h"
3534
#include "core/data_type/data_type_agg_state.h"
3635
#include "core/data_type/data_type_factory.hpp"
3736
#include "exec/common/util.hpp"
@@ -282,28 +281,35 @@ void AggFnEvaluator::destroy(AggregateDataPtr place) {
282281

283282
Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena& arena) {
284283
RETURN_IF_ERROR(_calc_argument_columns(block));
285-
_function->add_batch_single_place(block->rows(), place, _agg_columns.data(), arena);
284+
_function->add_batch_single_place(block->rows(), place, _agg_raw_input_columns.data(), arena);
285+
_reset_input_columns();
286286
return Status::OK();
287287
}
288288

289289
Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
290290
Arena& arena, bool agg_many) {
291291
RETURN_IF_ERROR(_calc_argument_columns(block));
292-
_function->add_batch(block->rows(), places, offset, _agg_columns.data(), arena, agg_many);
292+
_function->add_batch(block->rows(), places, offset, _agg_raw_input_columns.data(), arena,
293+
agg_many);
294+
_reset_input_columns();
293295
return Status::OK();
294296
}
295297

296298
Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset,
297299
AggregateDataPtr* places, Arena& arena) {
298300
RETURN_IF_ERROR(_calc_argument_columns(block));
299-
_function->add_batch_selected(block->rows(), places, offset, _agg_columns.data(), arena);
301+
_function->add_batch_selected(block->rows(), places, offset, _agg_raw_input_columns.data(),
302+
arena);
303+
_reset_input_columns();
300304
return Status::OK();
301305
}
302306

303307
Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
304308
const size_t num_rows, Arena& arena) {
305309
RETURN_IF_ERROR(_calc_argument_columns(block));
306-
_function->streaming_agg_serialize_to_column(_agg_columns.data(), dst, num_rows, arena);
310+
_function->streaming_agg_serialize_to_column(_agg_raw_input_columns.data(), dst, num_rows,
311+
arena);
312+
_reset_input_columns();
307313
return Status::OK();
308314
}
309315

@@ -340,23 +346,28 @@ std::string AggFnEvaluator::debug_string() const {
340346
return out.str();
341347
}
342348

343-
Status AggFnEvaluator::_calc_argument_columns(Block* block) {
349+
Status AggFnEvaluator::_calc_argument_columns(const Block* block) {
344350
SCOPED_TIMER(_expr_timer);
345-
_agg_columns.resize(_input_exprs_ctxs.size());
346-
std::vector<int> column_ids(_input_exprs_ctxs.size());
351+
_agg_input_columns.resize(_input_exprs_ctxs.size(), nullptr);
352+
_agg_raw_input_columns.resize(_input_exprs_ctxs.size(), nullptr);
347353
for (int i = 0; i < _input_exprs_ctxs.size(); ++i) {
348-
int column_id = -1;
349-
RETURN_IF_ERROR(_input_exprs_ctxs[i]->execute(block, &column_id));
350-
column_ids[i] = column_id;
351-
}
352-
materialize_block_inplace(*block, column_ids.data(),
353-
column_ids.data() + _input_exprs_ctxs.size());
354-
for (int i = 0; i < _input_exprs_ctxs.size(); ++i) {
355-
_agg_columns[i] = block->get_by_position(column_ids[i]).column.get();
354+
RETURN_IF_ERROR(_input_exprs_ctxs[i]->execute(block, _agg_input_columns[i]));
355+
_agg_input_columns[i] = _agg_input_columns[i]->convert_to_full_column_if_const();
356+
_agg_raw_input_columns[i] = _agg_input_columns[i].get();
356357
}
357358
return Status::OK();
358359
}
359360

361+
void AggFnEvaluator::_reset_input_columns() {
362+
SCOPED_TIMER(_expr_timer);
363+
for (auto& col : _agg_input_columns) {
364+
col = nullptr;
365+
}
366+
for (auto& col : _agg_raw_input_columns) {
367+
col = nullptr;
368+
}
369+
}
370+
360371
AggFnEvaluator* AggFnEvaluator::clone(RuntimeState* state, ObjectPool* pool) {
361372
return pool->add(AggFnEvaluator::create_unique(*this, state).release());
362373
}
@@ -374,7 +385,8 @@ AggFnEvaluator::AggFnEvaluator(AggFnEvaluator& evaluator, RuntimeState* state)
374385
_data_type(evaluator._data_type),
375386
_function(evaluator._function),
376387
_expr_name(evaluator._expr_name),
377-
_agg_columns(evaluator._agg_columns) {
388+
_agg_input_columns(evaluator._agg_input_columns),
389+
_agg_raw_input_columns(evaluator._agg_raw_input_columns) {
378390
if (evaluator._fn.binary_type == TFunctionBinaryType::JAVA_UDF) {
379391
DataTypes tmp_argument_types;
380392
tmp_argument_types.reserve(evaluator._input_exprs_ctxs.size());

be/src/exprs/vectorized_agg_fn.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,9 @@ class AggFnEvaluator {
127127
_without_key(without_key),
128128
_is_window_function(is_window_function) {};
129129
#endif
130-
Status _calc_argument_columns(Block* block);
130+
Status _calc_argument_columns(const Block* block);
131+
132+
void _reset_input_columns();
131133

132134
DataTypes _argument_types_with_sort;
133135
DataTypes _real_argument_types;
@@ -149,7 +151,9 @@ class AggFnEvaluator {
149151

150152
std::string _expr_name;
151153

152-
std::vector<const IColumn*> _agg_columns;
154+
Columns _agg_input_columns;
155+
156+
std::vector<const IColumn*> _agg_raw_input_columns;
153157
};
154158

155159
#include "common/compile_check_end.h"

0 commit comments

Comments
 (0)