Skip to content

Commit b83e565

Browse files
committed
refactor of RuntimeFilterSlots
1 parent 1a8372e commit b83e565

12 files changed

+230
-272
lines changed

be/src/exprs/runtime_filter.h

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,13 @@
2727
#include <vector>
2828

2929
#include "common/status.h"
30-
#include "runtime/decimalv2_value.h"
3130
#include "runtime/define_primitive_type.h"
32-
#include "runtime/large_int_value.h"
33-
#include "runtime/primitive_type.h"
3431
#include "runtime/query_context.h"
3532
#include "runtime/runtime_state.h"
36-
#include "runtime/types.h"
3733
#include "util/runtime_profile.h"
3834
#include "util/time.h"
39-
#include "util/uid_util.h"
40-
#include "vec/common/string_ref.h"
41-
#include "vec/core/types.h"
4235
#include "vec/data_types/data_type.h"
4336
#include "vec/exprs/vexpr.h"
44-
#include "vec/runtime/vdatetime_value.h"
4537

4638
namespace butil {
4739
class IOBufAsZeroCopyInputStream;
@@ -222,7 +214,7 @@ class IRuntimeFilter {
222214

223215
// publish filter
224216
// push filter to remote node or push down it to scan_node
225-
Status publish(RuntimeState* state, bool publish_local = false);
217+
Status publish(RuntimeState* state, bool publish_local);
226218

227219
Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);
228220

be/src/exprs/runtime_filter_slots.cpp

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "exprs/runtime_filter_slots.h"
19+
20+
#include "pipeline/pipeline_task.h"
21+
22+
namespace doris {
23+
24+
// use synced size when this rf has global merged
25+
static uint64_t get_real_size(IRuntimeFilter* filter, uint64_t hash_table_size) {
26+
return filter->need_sync_filter_size() ? filter->get_synced_size() : hash_table_size;
27+
}
28+
29+
Status RuntimeFilterSlots::send_filter_size(
30+
RuntimeState* state, uint64_t hash_table_size,
31+
std::shared_ptr<pipeline::CountedFinishDependency> dependency) {
32+
if (_runtime_filters_disabled) {
33+
return Status::OK();
34+
}
35+
for (auto runtime_filter : _runtime_filters) {
36+
if (runtime_filter->need_sync_filter_size()) {
37+
runtime_filter->set_finish_dependency(dependency);
38+
}
39+
}
40+
41+
// send_filter_size may call dependency->sub(), so we call set_finish_dependency firstly for all rf to avoid dependency set_ready repeatedly
42+
for (auto runtime_filter : _runtime_filters) {
43+
if (runtime_filter->need_sync_filter_size()) {
44+
RETURN_IF_ERROR(runtime_filter->send_filter_size(state, hash_table_size));
45+
}
46+
}
47+
return Status::OK();
48+
}
49+
50+
/**
51+
Disable meaningless filters, such as filters:
52+
RF1: col1 in (1, 3, 5)
53+
RF2: col1 min: 1, max: 5
54+
We consider RF2 is meaningless, because RF1 has already filtered out all values that RF2 can filter.
55+
*/
56+
Status RuntimeFilterSlots::_disable_meaningless_filters(RuntimeState* state) {
57+
// process ignore duplicate IN_FILTER
58+
std::unordered_set<int> has_in_filter;
59+
for (auto filter : _runtime_filters) {
60+
if (filter->get_ignored() || filter->get_disabled()) {
61+
continue;
62+
}
63+
if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
64+
continue;
65+
}
66+
if (!filter->need_sync_filter_size() &&
67+
filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
68+
continue;
69+
}
70+
if (has_in_filter.contains(filter->expr_order())) {
71+
filter->set_disabled();
72+
continue;
73+
}
74+
has_in_filter.insert(filter->expr_order());
75+
}
76+
77+
// process ignore filter when it has IN_FILTER on same expr
78+
for (auto filter : _runtime_filters) {
79+
if (filter->get_ignored() || filter->get_disabled()) {
80+
continue;
81+
}
82+
if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
83+
!has_in_filter.contains(filter->expr_order())) {
84+
continue;
85+
}
86+
filter->set_disabled();
87+
}
88+
return Status::OK();
89+
}
90+
91+
Status RuntimeFilterSlots::_init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
92+
// process IN_OR_BLOOM_FILTER's real type
93+
for (auto filter : _runtime_filters) {
94+
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
95+
get_real_size(filter.get(), local_hash_table_size) >
96+
state->runtime_filter_max_in_num()) {
97+
RETURN_IF_ERROR(filter->change_to_bloom_filter());
98+
}
99+
100+
if (filter->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
101+
RETURN_IF_ERROR(
102+
filter->init_bloom_filter(get_real_size(filter.get(), local_hash_table_size)));
103+
}
104+
}
105+
return Status::OK();
106+
}
107+
108+
void RuntimeFilterSlots::_insert(const vectorized::Block* block, size_t start) {
109+
SCOPED_TIMER(_runtime_filter_compute_timer);
110+
for (auto& filter : _runtime_filters) {
111+
int result_column_id =
112+
_build_expr_context[filter->expr_order()]->get_last_result_column_id();
113+
const auto& column = block->get_by_position(result_column_id).column;
114+
if (filter->get_ignored() || filter->get_disabled()) {
115+
continue;
116+
}
117+
filter->insert_batch(column, start);
118+
}
119+
}
120+
121+
Status RuntimeFilterSlots::process(
122+
RuntimeState* state, const vectorized::Block* block,
123+
std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency) {
124+
if (_runtime_filters_disabled) {
125+
return Status::OK();
126+
}
127+
if (state->get_task()->wake_up_early()) {
128+
// partitial ignore rf to make global rf work
129+
for (auto filter : _runtime_filters) {
130+
filter->set_ignored();
131+
}
132+
} else if (_should_build_hash_table) {
133+
uint64_t hash_table_size = block ? block->rows() : 0;
134+
{
135+
RETURN_IF_ERROR(_init_filters(state, hash_table_size));
136+
RETURN_IF_ERROR(_disable_meaningless_filters(state));
137+
}
138+
if (hash_table_size > 1) {
139+
_insert(block, 1);
140+
}
141+
}
142+
RETURN_IF_ERROR(_publish(state));
143+
return Status::OK();
144+
}
145+
146+
} // namespace doris

be/src/exprs/runtime_filter_slots.h

Lines changed: 45 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -17,143 +17,50 @@
1717

1818
#pragma once
1919

20-
#include "common/exception.h"
2120
#include "common/status.h"
2221
#include "exprs/runtime_filter.h"
2322
#include "runtime/runtime_filter_mgr.h"
2423
#include "runtime/runtime_state.h"
25-
#include "vec/columns/column_nullable.h"
26-
#include "vec/columns/columns_number.h"
27-
#include "vec/common/assert_cast.h"
2824
#include "vec/core/block.h" // IWYU pragma: keep
2925
#include "vec/exprs/vexpr_context.h"
3026
#include "vec/runtime/shared_hash_table_controller.h"
3127

3228
namespace doris {
3329
// this class used in hash join node
34-
class VRuntimeFilterSlots {
30+
class RuntimeFilterSlots {
3531
public:
36-
VRuntimeFilterSlots(
37-
const std::vector<std::shared_ptr<vectorized::VExprContext>>& build_expr_ctxs,
38-
const std::vector<std::shared_ptr<IRuntimeFilter>>& runtime_filters)
39-
: _build_expr_context(build_expr_ctxs), _runtime_filters(runtime_filters) {}
40-
41-
Status send_filter_size(RuntimeState* state, uint64_t hash_table_size,
42-
std::shared_ptr<pipeline::CountedFinishDependency> dependency) {
43-
if (_runtime_filters.empty()) {
44-
return Status::OK();
45-
}
46-
for (auto runtime_filter : _runtime_filters) {
47-
if (runtime_filter->need_sync_filter_size()) {
48-
runtime_filter->set_finish_dependency(dependency);
49-
}
50-
}
51-
52-
// send_filter_size may call dependency->sub(), so we call set_finish_dependency firstly for all rf to avoid dependency set_ready repeatedly
53-
for (auto runtime_filter : _runtime_filters) {
54-
if (runtime_filter->need_sync_filter_size()) {
55-
RETURN_IF_ERROR(runtime_filter->send_filter_size(state, hash_table_size));
56-
}
57-
}
58-
return Status::OK();
59-
}
60-
61-
// use synced size when this rf has global merged
62-
static uint64_t get_real_size(IRuntimeFilter* filter, uint64_t hash_table_size) {
63-
return filter->need_sync_filter_size() ? filter->get_synced_size() : hash_table_size;
32+
RuntimeFilterSlots(const vectorized::VExprContextSPtrs& build_expr_ctxs,
33+
RuntimeProfile* profile,
34+
const std::vector<std::shared_ptr<IRuntimeFilter>>& runtime_filters,
35+
bool should_build_hash_table)
36+
: _build_expr_context(build_expr_ctxs),
37+
_runtime_filters(runtime_filters),
38+
_should_build_hash_table(should_build_hash_table),
39+
_runtime_filters_profile(new RuntimeProfile("RuntimeFilterSlots")) {
40+
profile->add_child(_runtime_filters_profile.get(), true, nullptr);
41+
_publish_runtime_filter_timer =
42+
ADD_TIMER_WITH_LEVEL(_runtime_filters_profile, "PublishTime", 1);
43+
_runtime_filter_compute_timer =
44+
ADD_TIMER_WITH_LEVEL(_runtime_filters_profile, "BuildTime", 1);
6445
}
6546

66-
/**
67-
Disable meaningless filters, such as filters:
68-
RF1: col1 in (1, 3, 5)
69-
RF2: col1 min: 1, max: 5
70-
We consider RF2 is meaningless, because RF1 has already filtered out all values that RF2 can filter.
71-
*/
72-
Status disable_meaningless_filters(RuntimeState* state) {
73-
// process ignore duplicate IN_FILTER
74-
std::unordered_set<int> has_in_filter;
75-
for (auto filter : _runtime_filters) {
76-
if (filter->get_ignored() || filter->get_disabled()) {
77-
continue;
78-
}
79-
if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
80-
continue;
81-
}
82-
if (!filter->need_sync_filter_size() &&
83-
filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
84-
continue;
85-
}
86-
if (has_in_filter.contains(filter->expr_order())) {
87-
filter->set_disabled();
88-
continue;
89-
}
90-
has_in_filter.insert(filter->expr_order());
91-
}
92-
93-
// process ignore filter when it has IN_FILTER on same expr
94-
for (auto filter : _runtime_filters) {
95-
if (filter->get_ignored() || filter->get_disabled()) {
96-
continue;
97-
}
98-
if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
99-
!has_in_filter.contains(filter->expr_order())) {
100-
continue;
101-
}
102-
filter->set_disabled();
103-
}
104-
return Status::OK();
105-
}
106-
107-
Status ignore_all_filters() {
108-
for (auto filter : _runtime_filters) {
109-
filter->set_ignored();
110-
}
111-
return Status::OK();
112-
}
47+
Status send_filter_size(RuntimeState* state, uint64_t hash_table_size,
48+
std::shared_ptr<pipeline::CountedFinishDependency> dependency);
11349

114-
Status disable_all_filters() {
50+
Status disable_all_filters(
51+
RuntimeState* state,
52+
std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency) {
53+
RETURN_IF_ERROR(send_filter_size(state, 0, finish_dependency));
11554
for (auto filter : _runtime_filters) {
11655
filter->set_disabled();
11756
}
57+
RETURN_IF_ERROR(_publish(state));
58+
_runtime_filters_disabled = true;
11859
return Status::OK();
11960
}
12061

121-
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
122-
// process IN_OR_BLOOM_FILTER's real type
123-
for (auto filter : _runtime_filters) {
124-
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
125-
get_real_size(filter.get(), local_hash_table_size) >
126-
state->runtime_filter_max_in_num()) {
127-
RETURN_IF_ERROR(filter->change_to_bloom_filter());
128-
}
129-
130-
if (filter->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
131-
RETURN_IF_ERROR(filter->init_bloom_filter(
132-
get_real_size(filter.get(), local_hash_table_size)));
133-
}
134-
}
135-
return Status::OK();
136-
}
137-
138-
void insert(const vectorized::Block* block) {
139-
for (auto& filter : _runtime_filters) {
140-
int result_column_id =
141-
_build_expr_context[filter->expr_order()]->get_last_result_column_id();
142-
const auto& column = block->get_by_position(result_column_id).column;
143-
if (filter->get_ignored() || filter->get_disabled()) {
144-
continue;
145-
}
146-
filter->insert_batch(column, 1);
147-
}
148-
}
149-
150-
// publish runtime filter
151-
Status publish(RuntimeState* state, bool publish_local) {
152-
for (auto& filter : _runtime_filters) {
153-
RETURN_IF_ERROR(filter->publish(state, publish_local));
154-
}
155-
return Status::OK();
156-
}
62+
Status process(RuntimeState* state, const vectorized::Block* block,
63+
std::shared_ptr<pipeline::CountedFinishDependency> finish_dependency);
15764

15865
void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) {
15966
for (auto& filter : _runtime_filters) {
@@ -173,11 +80,30 @@ class VRuntimeFilterSlots {
17380
return Status::OK();
17481
}
17582

176-
bool empty() { return _runtime_filters.empty(); }
83+
protected:
84+
Status _disable_meaningless_filters(RuntimeState* state);
85+
Status _init_filters(RuntimeState* state, uint64_t local_hash_table_size);
86+
void _insert(const vectorized::Block* block, size_t start);
87+
Status _publish(RuntimeState* state) {
88+
if (_runtime_filters_disabled) {
89+
return Status::OK();
90+
}
91+
SCOPED_TIMER(_publish_runtime_filter_timer);
92+
for (auto& filter : _runtime_filters) {
93+
RETURN_IF_ERROR(filter->publish(state, !_should_build_hash_table));
94+
}
95+
return Status::OK();
96+
}
17797

178-
private:
17998
const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context;
18099
std::vector<std::shared_ptr<IRuntimeFilter>> _runtime_filters;
100+
bool _should_build_hash_table;
101+
102+
RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
103+
RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
104+
std::unique_ptr<RuntimeProfile> _runtime_filters_profile;
105+
106+
bool _runtime_filters_disabled = false;
181107
};
182108

183109
} // namespace doris

0 commit comments

Comments
 (0)