Skip to content

Commit

Permalink
rename consumer
Browse files Browse the repository at this point in the history
update

fix

update

fix
  • Loading branch information
BiteTheDDDDt committed Feb 8, 2025
1 parent 3fb17b5 commit 207d247
Show file tree
Hide file tree
Showing 28 changed files with 782 additions and 643 deletions.
5 changes: 3 additions & 2 deletions be/src/exprs/create_predicate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ typename Traits::BasePtr create_predicate_function(PrimitiveType type) {
APPLY_FOR_PRIMTYPE(M)
#undef M
default:
throw Status::InternalError("predicate with type " + type_to_string(type));
throw Exception(ErrorCode::INTERNAL_ERROR, "predicate with type " + type_to_string(type));
}

return nullptr;
Expand All @@ -150,7 +150,8 @@ typename Traits::BasePtr create_bitmap_predicate_function(PrimitiveType type) {
case TYPE_BIGINT:
return Creator::template create<TYPE_BIGINT>();
default:
throw Status::InternalError("bitmap predicate with type " + type_to_string(type));
throw Exception(ErrorCode::INTERNAL_ERROR,
"bitmap predicate with type " + type_to_string(type));
}

return nullptr;
Expand Down
390 changes: 12 additions & 378 deletions be/src/exprs/runtime_filter/runtime_filter.cpp

Large diffs are not rendered by default.

127 changes: 15 additions & 112 deletions be/src/exprs/runtime_filter/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "exprs/runtime_filter/utils.h"
#include "pipeline/dependency.h"
#include "runtime/query_context.h"
#include "vec/exprs/vexpr_fwd.h"

namespace doris {
class RuntimePredicateWrapper;
Expand All @@ -38,45 +37,12 @@ class RuntimeProfile;
/// that can be pushed down to node based on the results of the right table.
class RuntimeFilter {
public:
static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const RuntimeFilterRole role, int node_id,
std::shared_ptr<RuntimeFilter>* res);

RuntimeFilterContextSPtr& get_shared_context_ref();

// insert data to build filter
void insert_batch(vectorized::ColumnPtr column, size_t start);

// publish filter
// push filter to remote node or push down it to scan_node
Status publish(RuntimeState* state, bool publish_local);

Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);

RuntimeFilterType type() const { return _runtime_filter_type; }

Status get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
std::vector<vectorized::VRuntimeFilterPtr>& push_exprs,
bool is_late_arrival);

bool has_local_target() const { return _has_local_target; }

RuntimeFilterState current_state() const { return _rf_state.load(std::memory_order_acquire); }

int expr_order() const { return _expr_order; }

void update_state();
// this function will be called if a runtime filter sent by rpc
// it will notify all wait threads
void signal();

Status merge_from(const RuntimeFilter* other);

Status init_with_size(size_t local_size);

void update_filter(const RuntimeFilter* other, int64_t merge_time, int64_t start_apply,
uint64_t local_merge_time);

void set_ignored();

bool get_ignored();
Expand All @@ -86,37 +52,7 @@ class RuntimeFilter {

RuntimeFilterType get_real_type();

bool need_sync_filter_size();

// async push runtimefilter to remote node
Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr,
uint64_t local_merge_time);

void init_profile(RuntimeProfile* parent_profile);

std::string debug_string() const;

void update_runtime_filter_type_to_profile(uint64_t local_merge_time);

int filter_id() const { return _filter_id; }

std::shared_ptr<pipeline::RuntimeFilterTimer> create_filter_timer(
std::shared_ptr<pipeline::RuntimeFilterDependency> dependencie);

std::string formatted_state() const;

void set_synced_size(uint64_t global_size);

void set_finish_dependency(
const std::shared_ptr<pipeline::CountedFinishDependency>& dependency);

int64_t get_synced_size() const {
if (_synced_size == -1 || !_dependency) {
throw Exception(doris::ErrorCode::INTERNAL_ERROR,
"sync filter size meet error, filter: {}", debug_string());
}
return _synced_size;
}
int filter_id() const { return _wrapper->_filter_id; }

template <class T>
Status assign_data_into_wrapper(const T& request, butil::IOBufAsZeroCopyInputStream* data) {
Expand Down Expand Up @@ -149,77 +85,44 @@ class RuntimeFilter {
return Status::OK();
}

private:
RuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
RuntimeFilterRole role)
protected:
RuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc)
: _state(state),
_filter_id(desc->filter_id),
_is_broadcast_join(desc->is_broadcast_join),
_has_remote_target(desc->has_remote_targets),
_has_local_target(desc->has_local_targets),
_rf_state(RuntimeFilterState::NOT_READY),
_role(role),
_expr_order(desc->expr_order),
_registration_time(MonotonicMillis()),
_runtime_filter_type(get_runtime_filter_type(desc)),
_profile(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})",
_filter_id,
to_string(_runtime_filter_type)))) {
// If bitmap filter is not applied, it will cause the query result to be incorrect
bool wait_infinitely = _state->get_query_ctx()->runtime_filter_wait_infinitely() ||
_runtime_filter_type == RuntimeFilterType::BITMAP_FILTER;
_rf_wait_time_ms = wait_infinitely ? _state->get_query_ctx()->execution_timeout() * 1000
: _state->get_query_ctx()->runtime_filter_wait_time_ms();
_runtime_filter_type(get_runtime_filter_type(desc)) {
DCHECK_NE(_has_remote_target, _has_local_target);
}

Status _init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1);
Status _init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options);

// serialize _wrapper to protobuf
void _to_protobuf(PInFilter* filter);
void _to_protobuf(PMinMaxFilter* filter);

void _set_push_down(bool push_down) { _is_push_down = push_down; }
Status _push_to_remote(RuntimeState* state, const TNetworkAddress* addr,
uint64_t local_merge_time);

std::string _debug_string() const;

RuntimeFilterParamsContext* _state = nullptr;
// _wrapper is a runtime filter function wrapper
std::shared_ptr<RuntimePredicateWrapper> _wrapper;
// runtime filter id
int _filter_id;
// Specific types BoardCast or Shuffle
bool _is_broadcast_join;

// will apply to remote node
bool _has_remote_target;
// will apply to local node
bool _has_local_target;
// filter is ready for consumer
std::atomic<RuntimeFilterState> _rf_state;
// role consumer or producer
RuntimeFilterRole _role;
// expr index
int _expr_order;
// used for await or signal
std::mutex _inner_mutex;
std::condition_variable _inner_cv;
bool _is_push_down = false;
TExpr _probe_expr;

/// Time in ms (from MonotonicMillis()), that the filter was registered.
const int64_t _registration_time;
int32_t _rf_wait_time_ms;

std::atomic<bool> _profile_init = false;

// runtime filter type
RuntimeFilterType _runtime_filter_type;
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile::Counter* _wait_timer = nullptr;

std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;

int64_t _synced_size = -1;
std::shared_ptr<pipeline::CountedFinishDependency> _dependency;
friend class RuntimeFilterProducer;
friend class RuntimeFilterConsumer;
friend class RuntimeFilterMerger;
};

} // namespace doris
129 changes: 129 additions & 0 deletions be/src/exprs/runtime_filter/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exprs/runtime_filter/runtime_filter_consumer.h"

#include "exprs/bitmapfilter_predicate.h"

namespace doris {

Status RuntimeFilterConsumer::get_push_expr_ctxs(
std::list<vectorized::VExprContextSPtr>& probe_ctxs,
std::vector<vectorized::VRuntimeFilterPtr>& push_exprs, bool is_late_arrival) {
auto origin_size = push_exprs.size();
if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) {
_is_push_down = !is_late_arrival;
RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr));
}
_profile->add_info_string("Info", formatted_state());
// The runtime filter is pushed down, adding filtering information.
auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "ExprFilteredRows", TUnit::UNIT);
auto* expr_input_rows_counter = ADD_COUNTER(_profile, "ExprInputRows", TUnit::UNIT);
auto* always_true_counter = ADD_COUNTER(_profile, "AlwaysTruePassRows", TUnit::UNIT);
for (auto i = origin_size; i < push_exprs.size(); i++) {
push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, expr_input_rows_counter,
always_true_counter);
}
return Status::OK();
}

std::string RuntimeFilterConsumer::formatted_state() const {
return fmt::format(
"[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, WaitTimeMS = {}]",
filter_id(), _is_push_down, to_string(_rf_state), _has_remote_target, _has_local_target,
_wrapper->_context->ignored, _wrapper->_context->disabled, _wrapper->get_real_type(),
_rf_wait_time_ms);
}

void RuntimeFilterConsumer::init_profile(RuntimeProfile* parent_profile) {
if (_profile_init) {
parent_profile->add_child(_profile.get(), true, nullptr);
} else {
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
_profile->add_info_string("Info", formatted_state());
_wait_timer = ADD_TIMER(_profile, "WaitTime");
}
}

void RuntimeFilterConsumer::signal() {
if (!_wrapper->is_ignored() && !_wrapper->is_disabled() && _wrapper->is_bloomfilter() &&
!_wrapper->get_bloomfilter()->inited()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored/disabled, rf: {}",
debug_string());
}

COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS));
_rf_state.store(RuntimeFilterState::READY);
if (!_filter_timer.empty()) {
for (auto& timer : _filter_timer) {
timer->call_ready();
}
}

if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER) {
_profile->add_info_string("InFilterSize", std::to_string(_wrapper->get_in_filter_size()));
}
if (_wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
auto bitmap_filter = _wrapper->get_bitmap_filter();
_profile->add_info_string("BitmapSize", std::to_string(bitmap_filter->size()));
_profile->add_info_string("IsNotIn", bitmap_filter->is_not_in() ? "true" : "false");
}
if (_wrapper->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
_profile->add_info_string("BloomFilterSize",
std::to_string(_wrapper->get_bloom_filter_size()));
}
}

void RuntimeFilterConsumer::update_filter(const RuntimeFilter* other, int64_t merge_time,
int64_t start_apply, uint64_t local_merge_time) {
_profile->add_info_string("UpdateTime",
std::to_string(MonotonicMillis() - start_apply) + " ms");
_profile->add_info_string("MergeTime", std::to_string(merge_time) + " ms");
_wrapper = other->_wrapper;
update_runtime_filter_type_to_profile(local_merge_time);
signal();
}

std::shared_ptr<pipeline::RuntimeFilterTimer> RuntimeFilterConsumer::create_filter_timer(
std::shared_ptr<pipeline::RuntimeFilterDependency> dependencie) {
auto timer = std::make_shared<pipeline::RuntimeFilterTimer>(_registration_time,
_rf_wait_time_ms, dependencie);
std::unique_lock lock(_inner_mutex);
_filter_timer.push_back(timer);
return timer;
}

void RuntimeFilterConsumer::update_state() {
auto execution_timeout = _state->get_query_ctx()->execution_timeout() * 1000;
auto runtime_filter_wait_time_ms = _state->get_query_ctx()->runtime_filter_wait_time_ms();
// bitmap filter is precise filter and only filter once, so it must be applied.
int64_t wait_times_ms = _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER
? execution_timeout
: runtime_filter_wait_time_ms;
auto expected = _rf_state.load(std::memory_order_acquire);
// In pipelineX, runtime filters will be ready or timeout before open phase.
if (expected == RuntimeFilterState::NOT_READY) {
DCHECK(MonotonicMillis() - _registration_time >= wait_times_ms);
COUNTER_SET(_wait_timer,
int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS));
_rf_state = RuntimeFilterState::TIMEOUT;
}
}

} // namespace doris
Loading

0 comments on commit 207d247

Please sign in to comment.