From 207d247b9dc3df363717f3aaebf3a93ee64f4eeb Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Sat, 8 Feb 2025 13:44:11 +0800 Subject: [PATCH] rename consumer update fix update fix --- be/src/exprs/create_predicate_function.h | 5 +- .../exprs/runtime_filter/runtime_filter.cpp | 390 +----------------- be/src/exprs/runtime_filter/runtime_filter.h | 127 +----- .../runtime_filter_consumer.cpp | 129 ++++++ .../runtime_filter/runtime_filter_consumer.h | 98 +++++ .../runtime_filter/runtime_filter_merger.h | 52 +++ .../runtime_filter_producer.cpp | 243 +++++++++++ .../runtime_filter/runtime_filter_producer.h | 88 ++++ .../runtime_filter/runtime_filter_slots.h | 5 +- .../runtime_filter_slots_cross.h | 8 +- .../runtime_filter/runtime_filter_wrapper.h | 12 + ...p => runtime_filter_consumer_operator.cpp} | 35 +- ...r.h => runtime_filter_consumer_operator.h} | 19 +- be/src/pipeline/dependency.cpp | 2 +- be/src/pipeline/dependency.h | 5 +- be/src/pipeline/exec/datagen_operator.cpp | 8 +- be/src/pipeline/exec/es_scan_operator.cpp | 10 +- .../pipeline/exec/join_build_sink_operator.h | 4 +- .../exec/multi_cast_data_stream_source.cpp | 10 +- .../exec/multi_cast_data_stream_source.h | 5 +- be/src/pipeline/exec/olap_scan_operator.cpp | 6 +- be/src/pipeline/exec/scan_operator.cpp | 6 +- be/src/pipeline/exec/scan_operator.h | 8 +- be/src/runtime/fragment_mgr.cpp | 6 +- be/src/runtime/runtime_filter_mgr.cpp | 89 ++-- be/src/runtime/runtime_filter_mgr.h | 34 +- be/src/runtime/runtime_state.cpp | 8 +- be/src/runtime/runtime_state.h | 13 +- 28 files changed, 782 insertions(+), 643 deletions(-) create mode 100644 be/src/exprs/runtime_filter/runtime_filter_consumer.cpp create mode 100644 be/src/exprs/runtime_filter/runtime_filter_consumer.h create mode 100644 be/src/exprs/runtime_filter/runtime_filter_merger.h create mode 100644 be/src/exprs/runtime_filter/runtime_filter_producer.cpp create mode 100644 be/src/exprs/runtime_filter/runtime_filter_producer.h rename be/src/pipeline/common/{runtime_filter_consumer.cpp => runtime_filter_consumer_operator.cpp} (83%) rename be/src/pipeline/common/{runtime_filter_consumer.h => runtime_filter_consumer_operator.h} (82%) diff --git a/be/src/exprs/create_predicate_function.h b/be/src/exprs/create_predicate_function.h index 593acd0f5d864e..7c702ad4ce6a2c 100644 --- a/be/src/exprs/create_predicate_function.h +++ b/be/src/exprs/create_predicate_function.h @@ -130,7 +130,7 @@ typename Traits::BasePtr create_predicate_function(PrimitiveType type) { APPLY_FOR_PRIMTYPE(M) #undef M default: - throw Status::InternalError("predicate with type " + type_to_string(type)); + throw Exception(ErrorCode::INTERNAL_ERROR, "predicate with type " + type_to_string(type)); } return nullptr; @@ -150,7 +150,8 @@ typename Traits::BasePtr create_bitmap_predicate_function(PrimitiveType type) { case TYPE_BIGINT: return Creator::template create(); default: - throw Status::InternalError("bitmap predicate with type " + type_to_string(type)); + throw Exception(ErrorCode::INTERNAL_ERROR, + "bitmap predicate with type " + type_to_string(type)); } return nullptr; diff --git a/be/src/exprs/runtime_filter/runtime_filter.cpp b/be/src/exprs/runtime_filter/runtime_filter.cpp index d28f89b4393d1d..eb633c8f2421b5 100644 --- a/be/src/exprs/runtime_filter/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter/runtime_filter.cpp @@ -22,6 +22,9 @@ #include "exprs/create_predicate_function.h" #include "exprs/hybrid_set.h" #include "exprs/minmax_predicate.h" +#include "exprs/runtime_filter/runtime_filter_consumer.h" +#include "exprs/runtime_filter/runtime_filter_merger.h" +#include "exprs/runtime_filter/runtime_filter_producer.h" #include "util/brpc_client_cache.h" #include "util/ref_count_closure.h" #include "vec/exprs/vbitmap_predicate.h" @@ -32,209 +35,8 @@ namespace doris { -Status RuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, - const RuntimeFilterRole role, int node_id, - std::shared_ptr* res) { - *res = std::shared_ptr(new RuntimeFilter(state, desc, role)); - return (*res)->_init_with_desc(desc, &state->get_query_ctx()->query_options(), node_id); -} - -RuntimeFilterContextSPtr& RuntimeFilter::get_shared_context_ref() { - return _wrapper->_context; -} - -void RuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t start) { - _wrapper->insert_batch(column, start); -} - -Status RuntimeFilter::publish(RuntimeState* state, bool publish_local) { - auto send_to_remote_targets = [&](RuntimeFilter* filter, uint64_t local_merge_time) { - TNetworkAddress addr; - DCHECK(_state != nullptr); - RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr)); - return filter->push_to_remote(state, &addr, local_merge_time); - }; - auto send_to_local_targets = [&](std::shared_ptr wrapper, bool global, - uint64_t local_merge_time = 0) { - std::vector> filters = - global ? _state->global_runtime_filter_mgr()->get_consume_filters(_filter_id) - : _state->local_runtime_filter_mgr()->get_consume_filters(_filter_id); - for (auto filter : filters) { - filter->_wrapper = wrapper; - filter->update_runtime_filter_type_to_profile(local_merge_time); - filter->signal(); - } - return Status::OK(); - }; - auto do_merge = [&]() { - // two case we need do local merge: - // 1. has remote target - // 2. has local target and has global consumer (means target scan has local shuffle) - if (_has_local_target && - _state->global_runtime_filter_mgr()->get_consume_filters(_filter_id).empty()) { - // when global consumer not exist, send_to_local_targets will do nothing, so merge rf is useless - return Status::OK(); - } - LocalMergeFilters* local_merge_filters = nullptr; - RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _filter_id, &local_merge_filters)); - local_merge_filters->merge_watcher.start(); - std::lock_guard l(*local_merge_filters->lock); - RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(this)); - local_merge_filters->merge_time--; - local_merge_filters->merge_watcher.stop(); - if (local_merge_filters->merge_time == 0) { - if (_has_local_target) { - RETURN_IF_ERROR( - send_to_local_targets(local_merge_filters->filters[0]->_wrapper, true, - local_merge_filters->merge_watcher.elapsed_time())); - } else { - RETURN_IF_ERROR( - send_to_remote_targets(local_merge_filters->filters[0].get(), - local_merge_filters->merge_watcher.elapsed_time())); - } - } - return Status::OK(); - }; - - if (_has_local_target) { - // A runtime filter may have multiple targets and some of those are local-merge RF and others are not. - // So for all runtime filters' producers, `publish` should notify all consumers in global RF mgr which manages local-merge RF and local RF mgr which manages others. - RETURN_IF_ERROR(do_merge()); - RETURN_IF_ERROR(send_to_local_targets(_wrapper, false)); - } else if (!publish_local) { - if (_is_broadcast_join || _state->get_query_ctx()->be_exec_version() < USE_NEW_SERDE) { - RETURN_IF_ERROR(send_to_remote_targets(this, 0)); - } else { - RETURN_IF_ERROR(do_merge()); - } - } else { - // remote broadcast join only push onetime in build shared hash table - // publish_local only set true on copy shared hash table - DCHECK(_is_broadcast_join); - } - return Status::OK(); -} - -class SyncSizeClosure : public AutoReleaseClosure> { - std::shared_ptr _dependency; - // Should use weak ptr here, because when query context deconstructs, should also delete runtime filter - // context, it not the memory is not released. And rpc is in another thread, it will hold rf context - // after query context because the rpc is not returned. - std::weak_ptr _rf_context; - using Base = - AutoReleaseClosure>; - ENABLE_FACTORY_CREATOR(SyncSizeClosure); - - void _process_if_rpc_failed() override { - Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; - auto ctx = _rf_context.lock(); - if (!ctx) { - return; - } - - ctx->err_msg = cntl_->ErrorText(); - Base::_process_if_rpc_failed(); - } - - void _process_if_meet_error_status(const Status& status) override { - Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; - auto ctx = _rf_context.lock(); - if (!ctx) { - return; - } - - if (status.is()) { - // rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case. - ctx->ignored = true; - } else { - ctx->err_msg = status.to_string(); - Base::_process_if_meet_error_status(status); - } - } - -public: - SyncSizeClosure(std::shared_ptr req, - std::shared_ptr> callback, - std::shared_ptr dependency, - RuntimeFilterContextSPtr rf_context, std::weak_ptr context) - : Base(req, callback, context), - _dependency(std::move(dependency)), - _rf_context(rf_context) {} -}; - -Status RuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) { - // two case we need do local merge: - // 1. has remote target - // 2. has local target and has global consumer (means target scan has local shuffle) - if (_has_remote_target || - !_state->global_runtime_filter_mgr()->get_consume_filters(_filter_id).empty()) { - LocalMergeFilters* local_merge_filters = nullptr; - RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _filter_id, &local_merge_filters)); - std::lock_guard l(*local_merge_filters->lock); - local_merge_filters->merge_size_times--; - local_merge_filters->local_merged_size += local_filter_size; - if (local_merge_filters->merge_size_times) { - return Status::OK(); - } else { - if (_has_local_target) { - for (auto filter : local_merge_filters->filters) { - filter->set_synced_size(local_merge_filters->local_merged_size); - } - return Status::OK(); - } else { - local_filter_size = local_merge_filters->local_merged_size; - } - } - } else if (_has_local_target) { - set_synced_size(local_filter_size); - return Status::OK(); - } - - TNetworkAddress addr; - DCHECK(_state != nullptr); - RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr)); - std::shared_ptr stub( - _state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(addr)); - if (!stub) { - return Status::InternalError("Get rpc stub failed, host={}, port={}", addr.hostname, - addr.port); - } - - auto request = std::make_shared(); - auto callback = DummyBrpcCallback::create_shared(); - // RuntimeFilter maybe deconstructed before the rpc finished, so that could not use - // a raw pointer in closure. Has to use the context's shared ptr. - auto closure = SyncSizeClosure::create_unique( - request, callback, _dependency, _wrapper->_context, - state->query_options().ignore_runtime_filter_error ? std::weak_ptr {} - : state->get_query_ctx_weak()); - auto* pquery_id = request->mutable_query_id(); - pquery_id->set_hi(_state->get_query_ctx()->query_id().hi); - pquery_id->set_lo(_state->get_query_ctx()->query_id().lo); - - auto* source_addr = request->mutable_source_addr(); - source_addr->set_hostname(BackendOptions::get_local_backend().host); - source_addr->set_port(BackendOptions::get_local_backend().brpc_port); - - request->set_filter_size(local_filter_size); - request->set_filter_id(_filter_id); - - callback->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(state->execution_timeout())); - if (config::execution_ignore_eovercrowded) { - callback->cntl_->ignore_eovercrowded(); - } - - stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), - closure.get()); - closure.release(); - return Status::OK(); -} - -Status RuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr, - uint64_t local_merge_time) { +Status RuntimeFilter::_push_to_remote(RuntimeState* state, const TNetworkAddress* addr, + uint64_t local_merge_time) { std::shared_ptr stub( _state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(*addr)); if (!stub) { @@ -261,7 +63,7 @@ Status RuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* pfragment_instance_id->set_hi(BackendOptions::get_local_backend().id); pfragment_instance_id->set_lo((int64_t)this); - merge_filter_request->set_filter_id(_filter_id); + merge_filter_request->set_filter_id(filter_id()); merge_filter_request->set_local_merge_time(local_merge_time); auto column_type = _wrapper->column_type(); RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type))); @@ -292,94 +94,6 @@ Status RuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* return Status::OK(); } -Status RuntimeFilter::get_push_expr_ctxs(std::list& probe_ctxs, - std::vector& push_exprs, - bool is_late_arrival) { - auto origin_size = push_exprs.size(); - if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) { - _set_push_down(!is_late_arrival); - RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr)); - } - _profile->add_info_string("Info", formatted_state()); - // The runtime filter is pushed down, adding filtering information. - auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "ExprFilteredRows", TUnit::UNIT); - auto* expr_input_rows_counter = ADD_COUNTER(_profile, "ExprInputRows", TUnit::UNIT); - auto* always_true_counter = ADD_COUNTER(_profile, "AlwaysTruePassRows", TUnit::UNIT); - for (auto i = origin_size; i < push_exprs.size(); i++) { - push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, expr_input_rows_counter, - always_true_counter); - } - return Status::OK(); -} - -void RuntimeFilter::update_state() { - auto execution_timeout = _state->get_query_ctx()->execution_timeout() * 1000; - auto runtime_filter_wait_time_ms = _state->get_query_ctx()->runtime_filter_wait_time_ms(); - // bitmap filter is precise filter and only filter once, so it must be applied. - int64_t wait_times_ms = _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER - ? execution_timeout - : runtime_filter_wait_time_ms; - auto expected = _rf_state.load(std::memory_order_acquire); - // In pipelineX, runtime filters will be ready or timeout before open phase. - if (expected == RuntimeFilterState::NOT_READY) { - DCHECK(MonotonicMillis() - _registration_time >= wait_times_ms); - COUNTER_SET(_wait_timer, - int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS)); - _rf_state = RuntimeFilterState::TIMEOUT; - } -} - -void RuntimeFilter::signal() { - if (!_wrapper->is_ignored() && !_wrapper->is_disabled() && _wrapper->is_bloomfilter() && - !_wrapper->get_bloomfilter()->inited()) { - throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored/disabled, rf: {}", - debug_string()); - } - - COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS)); - _rf_state.store(RuntimeFilterState::READY); - if (!_filter_timer.empty()) { - for (auto& timer : _filter_timer) { - timer->call_ready(); - } - } - - if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER) { - _profile->add_info_string("InFilterSize", std::to_string(_wrapper->get_in_filter_size())); - } - if (_wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER) { - auto bitmap_filter = _wrapper->get_bitmap_filter(); - _profile->add_info_string("BitmapSize", std::to_string(bitmap_filter->size())); - _profile->add_info_string("IsNotIn", bitmap_filter->is_not_in() ? "true" : "false"); - } - if (_wrapper->get_real_type() == RuntimeFilterType::BLOOM_FILTER) { - _profile->add_info_string("BloomFilterSize", - std::to_string(_wrapper->get_bloom_filter_size())); - } -} -std::shared_ptr RuntimeFilter::create_filter_timer( - std::shared_ptr dependencie) { - auto timer = std::make_shared(_registration_time, - _rf_wait_time_ms, dependencie); - std::unique_lock lock(_inner_mutex); - _filter_timer.push_back(timer); - return timer; -} - -void RuntimeFilter::set_finish_dependency( - const std::shared_ptr& dependency) { - _dependency = dependency; - _dependency->add(); - CHECK(_dependency); -} - -void RuntimeFilter::set_synced_size(uint64_t global_size) { - _synced_size = global_size; - if (_dependency) { - _dependency->sub(); - } -} - void RuntimeFilter::set_ignored() { _wrapper->set_ignored(); } @@ -396,25 +110,13 @@ bool RuntimeFilter::get_disabled() const { return _wrapper->is_disabled(); } -std::string RuntimeFilter::formatted_state() const { - return fmt::format( - "[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " - "HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, WaitTimeMS = {}]", - _filter_id, _is_push_down, to_string(_rf_state), _has_remote_target, _has_local_target, - _wrapper->_context->ignored, _wrapper->_context->disabled, _wrapper->get_real_type(), - _rf_wait_time_ms); -} - -Status RuntimeFilter::_init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - int node_id) { - // if node_id == -1 , it shouldn't be a consumer - DCHECK(node_id >= 0 || (node_id == -1 && _role != RuntimeFilterRole::CONSUMER)); - +Status RuntimeFilter::_init_with_desc(const TRuntimeFilterDesc* desc, + const TQueryOptions* options) { vectorized::VExprContextSPtr build_ctx; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(desc->src_expr, build_ctx)); RuntimeFilterParams params; - params.filter_id = _filter_id; + params.filter_id = desc->filter_id; params.filter_type = _runtime_filter_type; params.column_return_type = build_ctx->root()->type().type; params.max_in_num = options->runtime_filter_max_in_num; @@ -455,67 +157,13 @@ Status RuntimeFilter::_init_with_desc(const TRuntimeFilterDesc* desc, const TQue } } - if (node_id >= 0) { - const auto iter = desc->planId_to_target_expr.find(node_id); - if (iter == desc->planId_to_target_expr.end()) { - return Status::InternalError("not found a node id:{}", node_id); - } - _probe_expr = iter->second; - } - _wrapper = std::make_shared(¶ms); return Status::OK(); } -Status RuntimeFilter::init_with_size(size_t local_size) { - size_t real_size = need_sync_filter_size() ? get_synced_size() : local_size; - if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && - local_size > _wrapper->_max_in_num) { - RETURN_IF_ERROR(_wrapper->change_to_bloom_filter()); - } - - if (get_real_type() == RuntimeFilterType::BLOOM_FILTER) { - RETURN_IF_ERROR(_wrapper->init_bloom_filter(real_size)); - } - return Status::OK(); -} - -void RuntimeFilter::init_profile(RuntimeProfile* parent_profile) { - if (_profile_init) { - parent_profile->add_child(_profile.get(), true, nullptr); - } else { - _profile_init = true; - parent_profile->add_child(_profile.get(), true, nullptr); - _profile->add_info_string("Info", formatted_state()); - _wait_timer = ADD_TIMER(_profile, "WaitTime"); - } -} - -void RuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_time) { - _profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type())); - _profile->add_info_string("LocalMergeTime", - std::to_string((double)local_merge_time / NANOS_PER_SEC) + " s"); -} - -std::string RuntimeFilter::debug_string() const { - return fmt::format( - "RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: {}, disabled: {}, " - "build_bf_cardinality: {}, dependency: {}, synced_size: {}, has_local_target: {}, " - "has_remote_target: {}, error_msg: [{}]", - _filter_id, to_string(_runtime_filter_type), _is_broadcast_join, - _wrapper->_context->ignored, _wrapper->_context->disabled, - _wrapper->get_build_bf_cardinality(), - _dependency ? _dependency->debug_string() : "none", _synced_size, _has_local_target, - _has_remote_target, _wrapper->_context->err_msg); -} - -Status RuntimeFilter::merge_from(const RuntimeFilter* other) { - auto status = _wrapper->merge(other->_wrapper.get()); - if (!status) { - return Status::InternalError("runtime filter merge failed: {}, error_msg: {}", - debug_string(), status.msg()); - } - return Status::OK(); +std::string RuntimeFilter::_debug_string() const { + return fmt::format("{}, type = {}, mode: {}", _wrapper->debug_string(), + to_string(_runtime_filter_type), _has_local_target ? "LOCAL" : "GLOBAL"); } void RuntimeFilter::_to_protobuf(PInFilter* filter) { @@ -532,18 +180,4 @@ RuntimeFilterType RuntimeFilter::get_real_type() { return _wrapper->get_real_type(); } -bool RuntimeFilter::need_sync_filter_size() { - return _wrapper->get_build_bf_cardinality() && !_is_broadcast_join; -} - -void RuntimeFilter::update_filter(const RuntimeFilter* other, int64_t merge_time, - int64_t start_apply, uint64_t local_merge_time) { - _profile->add_info_string("UpdateTime", - std::to_string(MonotonicMillis() - start_apply) + " ms"); - _profile->add_info_string("MergeTime", std::to_string(merge_time) + " ms"); - _wrapper = other->_wrapper; - update_runtime_filter_type_to_profile(local_merge_time); - signal(); -} - } // namespace doris diff --git a/be/src/exprs/runtime_filter/runtime_filter.h b/be/src/exprs/runtime_filter/runtime_filter.h index f1ba08aa9ccde7..bf702372af53ac 100644 --- a/be/src/exprs/runtime_filter/runtime_filter.h +++ b/be/src/exprs/runtime_filter/runtime_filter.h @@ -25,7 +25,6 @@ #include "exprs/runtime_filter/utils.h" #include "pipeline/dependency.h" #include "runtime/query_context.h" -#include "vec/exprs/vexpr_fwd.h" namespace doris { class RuntimePredicateWrapper; @@ -38,45 +37,12 @@ class RuntimeProfile; /// that can be pushed down to node based on the results of the right table. class RuntimeFilter { public: - static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, - const RuntimeFilterRole role, int node_id, - std::shared_ptr* res); - - RuntimeFilterContextSPtr& get_shared_context_ref(); - - // insert data to build filter - void insert_batch(vectorized::ColumnPtr column, size_t start); - - // publish filter - // push filter to remote node or push down it to scan_node - Status publish(RuntimeState* state, bool publish_local); - - Status send_filter_size(RuntimeState* state, uint64_t local_filter_size); - RuntimeFilterType type() const { return _runtime_filter_type; } - Status get_push_expr_ctxs(std::list& probe_ctxs, - std::vector& push_exprs, - bool is_late_arrival); - bool has_local_target() const { return _has_local_target; } RuntimeFilterState current_state() const { return _rf_state.load(std::memory_order_acquire); } - int expr_order() const { return _expr_order; } - - void update_state(); - // this function will be called if a runtime filter sent by rpc - // it will notify all wait threads - void signal(); - - Status merge_from(const RuntimeFilter* other); - - Status init_with_size(size_t local_size); - - void update_filter(const RuntimeFilter* other, int64_t merge_time, int64_t start_apply, - uint64_t local_merge_time); - void set_ignored(); bool get_ignored(); @@ -86,37 +52,7 @@ class RuntimeFilter { RuntimeFilterType get_real_type(); - bool need_sync_filter_size(); - - // async push runtimefilter to remote node - Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr, - uint64_t local_merge_time); - - void init_profile(RuntimeProfile* parent_profile); - - std::string debug_string() const; - - void update_runtime_filter_type_to_profile(uint64_t local_merge_time); - - int filter_id() const { return _filter_id; } - - std::shared_ptr create_filter_timer( - std::shared_ptr dependencie); - - std::string formatted_state() const; - - void set_synced_size(uint64_t global_size); - - void set_finish_dependency( - const std::shared_ptr& dependency); - - int64_t get_synced_size() const { - if (_synced_size == -1 || !_dependency) { - throw Exception(doris::ErrorCode::INTERNAL_ERROR, - "sync filter size meet error, filter: {}", debug_string()); - } - return _synced_size; - } + int filter_id() const { return _wrapper->_filter_id; } template Status assign_data_into_wrapper(const T& request, butil::IOBufAsZeroCopyInputStream* data) { @@ -149,77 +85,44 @@ class RuntimeFilter { return Status::OK(); } -private: - RuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, - RuntimeFilterRole role) +protected: + RuntimeFilter(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc) : _state(state), - _filter_id(desc->filter_id), - _is_broadcast_join(desc->is_broadcast_join), _has_remote_target(desc->has_remote_targets), _has_local_target(desc->has_local_targets), _rf_state(RuntimeFilterState::NOT_READY), - _role(role), - _expr_order(desc->expr_order), - _registration_time(MonotonicMillis()), - _runtime_filter_type(get_runtime_filter_type(desc)), - _profile(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", - _filter_id, - to_string(_runtime_filter_type)))) { - // If bitmap filter is not applied, it will cause the query result to be incorrect - bool wait_infinitely = _state->get_query_ctx()->runtime_filter_wait_infinitely() || - _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER; - _rf_wait_time_ms = wait_infinitely ? _state->get_query_ctx()->execution_timeout() * 1000 - : _state->get_query_ctx()->runtime_filter_wait_time_ms(); + _runtime_filter_type(get_runtime_filter_type(desc)) { + DCHECK_NE(_has_remote_target, _has_local_target); } - Status _init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - int node_id = -1); + Status _init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options); // serialize _wrapper to protobuf void _to_protobuf(PInFilter* filter); void _to_protobuf(PMinMaxFilter* filter); - void _set_push_down(bool push_down) { _is_push_down = push_down; } + Status _push_to_remote(RuntimeState* state, const TNetworkAddress* addr, + uint64_t local_merge_time); + + std::string _debug_string() const; RuntimeFilterParamsContext* _state = nullptr; // _wrapper is a runtime filter function wrapper std::shared_ptr _wrapper; - // runtime filter id - int _filter_id; - // Specific types BoardCast or Shuffle - bool _is_broadcast_join; + // will apply to remote node bool _has_remote_target; // will apply to local node bool _has_local_target; // filter is ready for consumer std::atomic _rf_state; - // role consumer or producer - RuntimeFilterRole _role; - // expr index - int _expr_order; - // used for await or signal - std::mutex _inner_mutex; - std::condition_variable _inner_cv; - bool _is_push_down = false; - TExpr _probe_expr; - - /// Time in ms (from MonotonicMillis()), that the filter was registered. - const int64_t _registration_time; - int32_t _rf_wait_time_ms; - - std::atomic _profile_init = false; + // runtime filter type RuntimeFilterType _runtime_filter_type; - // parent profile - // only effect on consumer - std::unique_ptr _profile; - RuntimeProfile::Counter* _wait_timer = nullptr; - - std::vector> _filter_timer; - int64_t _synced_size = -1; - std::shared_ptr _dependency; + friend class RuntimeFilterProducer; + friend class RuntimeFilterConsumer; + friend class RuntimeFilterMerger; }; } // namespace doris diff --git a/be/src/exprs/runtime_filter/runtime_filter_consumer.cpp b/be/src/exprs/runtime_filter/runtime_filter_consumer.cpp new file mode 100644 index 00000000000000..5ab6a031c238ab --- /dev/null +++ b/be/src/exprs/runtime_filter/runtime_filter_consumer.cpp @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/runtime_filter/runtime_filter_consumer.h" + +#include "exprs/bitmapfilter_predicate.h" + +namespace doris { + +Status RuntimeFilterConsumer::get_push_expr_ctxs( + std::list& probe_ctxs, + std::vector& push_exprs, bool is_late_arrival) { + auto origin_size = push_exprs.size(); + if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) { + _is_push_down = !is_late_arrival; + RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr)); + } + _profile->add_info_string("Info", formatted_state()); + // The runtime filter is pushed down, adding filtering information. + auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "ExprFilteredRows", TUnit::UNIT); + auto* expr_input_rows_counter = ADD_COUNTER(_profile, "ExprInputRows", TUnit::UNIT); + auto* always_true_counter = ADD_COUNTER(_profile, "AlwaysTruePassRows", TUnit::UNIT); + for (auto i = origin_size; i < push_exprs.size(); i++) { + push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, expr_input_rows_counter, + always_true_counter); + } + return Status::OK(); +} + +std::string RuntimeFilterConsumer::formatted_state() const { + return fmt::format( + "[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " + "HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, WaitTimeMS = {}]", + filter_id(), _is_push_down, to_string(_rf_state), _has_remote_target, _has_local_target, + _wrapper->_context->ignored, _wrapper->_context->disabled, _wrapper->get_real_type(), + _rf_wait_time_ms); +} + +void RuntimeFilterConsumer::init_profile(RuntimeProfile* parent_profile) { + if (_profile_init) { + parent_profile->add_child(_profile.get(), true, nullptr); + } else { + _profile_init = true; + parent_profile->add_child(_profile.get(), true, nullptr); + _profile->add_info_string("Info", formatted_state()); + _wait_timer = ADD_TIMER(_profile, "WaitTime"); + } +} + +void RuntimeFilterConsumer::signal() { + if (!_wrapper->is_ignored() && !_wrapper->is_disabled() && _wrapper->is_bloomfilter() && + !_wrapper->get_bloomfilter()->inited()) { + throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored/disabled, rf: {}", + debug_string()); + } + + COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS)); + _rf_state.store(RuntimeFilterState::READY); + if (!_filter_timer.empty()) { + for (auto& timer : _filter_timer) { + timer->call_ready(); + } + } + + if (_wrapper->get_real_type() == RuntimeFilterType::IN_FILTER) { + _profile->add_info_string("InFilterSize", std::to_string(_wrapper->get_in_filter_size())); + } + if (_wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER) { + auto bitmap_filter = _wrapper->get_bitmap_filter(); + _profile->add_info_string("BitmapSize", std::to_string(bitmap_filter->size())); + _profile->add_info_string("IsNotIn", bitmap_filter->is_not_in() ? "true" : "false"); + } + if (_wrapper->get_real_type() == RuntimeFilterType::BLOOM_FILTER) { + _profile->add_info_string("BloomFilterSize", + std::to_string(_wrapper->get_bloom_filter_size())); + } +} + +void RuntimeFilterConsumer::update_filter(const RuntimeFilter* other, int64_t merge_time, + int64_t start_apply, uint64_t local_merge_time) { + _profile->add_info_string("UpdateTime", + std::to_string(MonotonicMillis() - start_apply) + " ms"); + _profile->add_info_string("MergeTime", std::to_string(merge_time) + " ms"); + _wrapper = other->_wrapper; + update_runtime_filter_type_to_profile(local_merge_time); + signal(); +} + +std::shared_ptr RuntimeFilterConsumer::create_filter_timer( + std::shared_ptr dependencie) { + auto timer = std::make_shared(_registration_time, + _rf_wait_time_ms, dependencie); + std::unique_lock lock(_inner_mutex); + _filter_timer.push_back(timer); + return timer; +} + +void RuntimeFilterConsumer::update_state() { + auto execution_timeout = _state->get_query_ctx()->execution_timeout() * 1000; + auto runtime_filter_wait_time_ms = _state->get_query_ctx()->runtime_filter_wait_time_ms(); + // bitmap filter is precise filter and only filter once, so it must be applied. + int64_t wait_times_ms = _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER + ? execution_timeout + : runtime_filter_wait_time_ms; + auto expected = _rf_state.load(std::memory_order_acquire); + // In pipelineX, runtime filters will be ready or timeout before open phase. + if (expected == RuntimeFilterState::NOT_READY) { + DCHECK(MonotonicMillis() - _registration_time >= wait_times_ms); + COUNTER_SET(_wait_timer, + int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS)); + _rf_state = RuntimeFilterState::TIMEOUT; + } +} + +} // namespace doris diff --git a/be/src/exprs/runtime_filter/runtime_filter_consumer.h b/be/src/exprs/runtime_filter/runtime_filter_consumer.h new file mode 100644 index 00000000000000..01f2cc00145e9a --- /dev/null +++ b/be/src/exprs/runtime_filter/runtime_filter_consumer.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exprs/runtime_filter/runtime_filter.h" + +namespace doris { + +class RuntimeFilterConsumer : public RuntimeFilter { +public: + static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, + int node_id, std::shared_ptr* res) { + *res = std::shared_ptr( + new RuntimeFilterConsumer(state, desc, node_id)); + return (*res)->_init_with_desc(desc, &state->get_query_ctx()->query_options()); + } + + int node_id() const { return _node_id; } + + Status get_push_expr_ctxs(std::list& probe_ctxs, + std::vector& push_exprs, + bool is_late_arrival); + std::string formatted_state() const; + + void init_profile(RuntimeProfile* parent_profile); + + void signal(); + + void update_filter(const RuntimeFilter* other, int64_t merge_time, int64_t start_apply, + uint64_t local_merge_time); + + std::shared_ptr create_filter_timer( + std::shared_ptr dependencie); + + void update_state(); + + std::string debug_string() const { + return fmt::format("RuntimeFilterConsumer: ({})", _debug_string()); + } + + void update_runtime_filter_type_to_profile(uint64_t local_merge_time) { + _profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type())); + _profile->add_info_string("LocalMergeTime", + std::to_string((double)local_merge_time / NANOS_PER_SEC) + " s"); + } + +private: + RuntimeFilterConsumer(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, + int node_id) + : RuntimeFilter(state, desc), + _node_id(node_id), + _probe_expr(desc->planId_to_target_expr.find(node_id)->second), + _profile(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", + desc->filter_id, + to_string(_runtime_filter_type)))), + _registration_time(MonotonicMillis()) { + // If bitmap filter is not applied, it will cause the query result to be incorrect + bool wait_infinitely = _state->get_query_ctx()->runtime_filter_wait_infinitely() || + _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER; + _rf_wait_time_ms = wait_infinitely ? _state->get_query_ctx()->execution_timeout() * 1000 + : _state->get_query_ctx()->runtime_filter_wait_time_ms(); + } + + int _node_id; + bool _is_push_down = false; + + TExpr _probe_expr; + + std::mutex _inner_mutex; + + std::vector> _filter_timer; + + std::atomic _profile_init = false; + std::unique_ptr _profile; + RuntimeProfile::Counter* _wait_timer = nullptr; + + int32_t _rf_wait_time_ms; + const int64_t _registration_time; + + friend class RuntimeFilterProducer; +}; + +} // namespace doris diff --git a/be/src/exprs/runtime_filter/runtime_filter_merger.h b/be/src/exprs/runtime_filter/runtime_filter_merger.h new file mode 100644 index 00000000000000..c06af1439550e0 --- /dev/null +++ b/be/src/exprs/runtime_filter/runtime_filter_merger.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exprs/runtime_filter/runtime_filter.h" + +namespace doris { + +class RuntimeFilterMerger : public RuntimeFilter { +public: + static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, + std::shared_ptr* res) { + *res = std::shared_ptr(new RuntimeFilterMerger(state, desc)); + return (*res)->_init_with_desc(desc, &state->get_query_ctx()->query_options()); + } + + std::string debug_string() const { + return fmt::format("RuntimeFilterConsumer: ({})", _debug_string()); + } + + Status merge_from(const RuntimeFilter* other) { + auto status = _wrapper->merge(other->_wrapper.get()); + if (!status) { + return Status::InternalError("runtime filter merge failed: {}, error_msg: {}", + debug_string(), status.msg()); + } + return Status::OK(); + } + +private: + RuntimeFilterMerger(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc) + : RuntimeFilter(state, desc) {} + + friend class RuntimeFilterProducer; +}; + +} // namespace doris diff --git a/be/src/exprs/runtime_filter/runtime_filter_producer.cpp b/be/src/exprs/runtime_filter/runtime_filter_producer.cpp new file mode 100644 index 00000000000000..a0cecf1e86e7fe --- /dev/null +++ b/be/src/exprs/runtime_filter/runtime_filter_producer.cpp @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/runtime_filter/runtime_filter_producer.h" + +#include "exprs/runtime_filter/runtime_filter_consumer.h" +#include "exprs/runtime_filter/runtime_filter_merger.h" +#include "util/brpc_client_cache.h" +#include "util/ref_count_closure.h" + +namespace doris { + +Status RuntimeFilterProducer::init_with_size(size_t local_size) { + size_t real_size = need_sync_filter_size() ? get_synced_size() : local_size; + if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && + local_size > _wrapper->_max_in_num) { + RETURN_IF_ERROR(_wrapper->change_to_bloom_filter()); + } + + if (get_real_type() == RuntimeFilterType::BLOOM_FILTER) { + RETURN_IF_ERROR(_wrapper->init_bloom_filter(real_size)); + } + return Status::OK(); +} + +Status RuntimeFilterProducer::_send_to_remote_targets(RuntimeState* state, RuntimeFilter* filter, + uint64_t local_merge_time) { + TNetworkAddress addr; + DCHECK(_state != nullptr); + RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr)); + return filter->_push_to_remote(state, &addr, local_merge_time); +}; + +Status RuntimeFilterProducer::_send_to_local_targets( + std::shared_ptr wrapper, bool global, uint64_t local_merge_time) { + std::vector> filters = + global ? _state->global_runtime_filter_mgr()->get_consume_filters(filter_id()) + : _state->local_runtime_filter_mgr()->get_consume_filters(filter_id()); + for (auto filter : filters) { + filter->_wrapper = wrapper; + filter->update_runtime_filter_type_to_profile(local_merge_time); + filter->signal(); + } + return Status::OK(); +}; + +Status RuntimeFilterProducer::publish(RuntimeState* state, bool publish_local) { + auto do_merge = [&]() { + // two case we need do local merge: + // 1. has remote target + // 2. has local target and has global consumer (means target scan has local shuffle) + if (_has_local_target && + _state->global_runtime_filter_mgr()->get_consume_filters(filter_id()).empty()) { + // when global consumer not exist, send_to_local_targets will do nothing, so merge rf is useless + return Status::OK(); + } + LocalMergeFilters* local_merge_filters = nullptr; + RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters( + filter_id(), &local_merge_filters)); + local_merge_filters->merge_watcher.start(); + std::lock_guard l(*local_merge_filters->lock); + RETURN_IF_ERROR(local_merge_filters->merger->merge_from(this)); + local_merge_filters->merge_time--; + local_merge_filters->merge_watcher.stop(); + if (local_merge_filters->merge_time == 0) { + if (_has_local_target) { + RETURN_IF_ERROR( + _send_to_local_targets(local_merge_filters->merger->_wrapper, true, + local_merge_filters->merge_watcher.elapsed_time())); + } else { + RETURN_IF_ERROR( + _send_to_remote_targets(state, local_merge_filters->merger.get(), + local_merge_filters->merge_watcher.elapsed_time())); + } + } + return Status::OK(); + }; + + if (_has_local_target) { + // A runtime filter may have multiple targets and some of those are local-merge RF and others are not. + // So for all runtime filters' producers, `publish` should notify all consumers in global RF mgr which manages local-merge RF and local RF mgr which manages others. + RETURN_IF_ERROR(do_merge()); + RETURN_IF_ERROR(_send_to_local_targets(_wrapper, false, 0)); + } else if (!publish_local) { + if (_is_broadcast_join) { + RETURN_IF_ERROR(_send_to_remote_targets(state, this, 0)); + } else { + RETURN_IF_ERROR(do_merge()); + } + } else { + // remote broadcast join only push onetime in build shared hash table + // publish_local only set true on copy shared hash table + DCHECK(_is_broadcast_join); + } + return Status::OK(); +} + +class SyncSizeClosure : public AutoReleaseClosure> { + std::shared_ptr _dependency; + // Should use weak ptr here, because when query context deconstructs, should also delete runtime filter + // context, it not the memory is not released. And rpc is in another thread, it will hold rf context + // after query context because the rpc is not returned. + std::weak_ptr _rf_context; + using Base = + AutoReleaseClosure>; + ENABLE_FACTORY_CREATOR(SyncSizeClosure); + + void _process_if_rpc_failed() override { + Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; + auto ctx = _rf_context.lock(); + if (!ctx) { + return; + } + + ctx->err_msg = cntl_->ErrorText(); + Base::_process_if_rpc_failed(); + } + + void _process_if_meet_error_status(const Status& status) override { + Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; + auto ctx = _rf_context.lock(); + if (!ctx) { + return; + } + + if (status.is()) { + // rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case. + ctx->ignored = true; + } else { + ctx->err_msg = status.to_string(); + Base::_process_if_meet_error_status(status); + } + } + +public: + SyncSizeClosure(std::shared_ptr req, + std::shared_ptr> callback, + std::shared_ptr dependency, + RuntimeFilterContextSPtr rf_context, std::weak_ptr context) + : Base(req, callback, context), + _dependency(std::move(dependency)), + _rf_context(rf_context) {} +}; + +Status RuntimeFilterProducer::send_filter_size(RuntimeState* state, uint64_t local_filter_size) { + // two case we need do local merge: + // 1. has remote target + // 2. has local target and has global consumer (means target scan has local shuffle) + if (_has_remote_target || + !_state->global_runtime_filter_mgr()->get_consume_filters(filter_id()).empty()) { + LocalMergeFilters* local_merge_filters = nullptr; + RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters( + filter_id(), &local_merge_filters)); + std::lock_guard l(*local_merge_filters->lock); + local_merge_filters->merge_size_times--; + local_merge_filters->local_merged_size += local_filter_size; + if (local_merge_filters->merge_size_times) { + return Status::OK(); + } else { + if (_has_local_target) { + for (auto filter : local_merge_filters->producers) { + filter->set_synced_size(local_merge_filters->local_merged_size); + } + return Status::OK(); + } else { + local_filter_size = local_merge_filters->local_merged_size; + } + } + } else if (_has_local_target) { + set_synced_size(local_filter_size); + return Status::OK(); + } + + TNetworkAddress addr; + DCHECK(_state != nullptr); + RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr)); + std::shared_ptr stub( + _state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(addr)); + if (!stub) { + return Status::InternalError("Get rpc stub failed, host={}, port={}", addr.hostname, + addr.port); + } + + auto request = std::make_shared(); + auto callback = DummyBrpcCallback::create_shared(); + // RuntimeFilter maybe deconstructed before the rpc finished, so that could not use + // a raw pointer in closure. Has to use the context's shared ptr. + auto closure = SyncSizeClosure::create_unique( + request, callback, _dependency, _wrapper->_context, + state->query_options().ignore_runtime_filter_error ? std::weak_ptr {} + : state->get_query_ctx_weak()); + auto* pquery_id = request->mutable_query_id(); + pquery_id->set_hi(_state->get_query_ctx()->query_id().hi); + pquery_id->set_lo(_state->get_query_ctx()->query_id().lo); + + auto* source_addr = request->mutable_source_addr(); + source_addr->set_hostname(BackendOptions::get_local_backend().host); + source_addr->set_port(BackendOptions::get_local_backend().brpc_port); + + request->set_filter_size(local_filter_size); + request->set_filter_id(filter_id()); + + callback->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(state->execution_timeout())); + if (config::execution_ignore_eovercrowded) { + callback->cntl_->ignore_eovercrowded(); + } + + stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), + closure.get()); + closure.release(); + return Status::OK(); +} + +void RuntimeFilterProducer::set_finish_dependency( + const std::shared_ptr& dependency) { + _dependency = dependency; + _dependency->add(); + CHECK(_dependency); +} + +void RuntimeFilterProducer::set_synced_size(uint64_t global_size) { + _synced_size = global_size; + if (_dependency) { + _dependency->sub(); + } +} + +} // namespace doris diff --git a/be/src/exprs/runtime_filter/runtime_filter_producer.h b/be/src/exprs/runtime_filter/runtime_filter_producer.h new file mode 100644 index 00000000000000..14a2651a3ad128 --- /dev/null +++ b/be/src/exprs/runtime_filter/runtime_filter_producer.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exprs/runtime_filter/runtime_filter.h" + +namespace doris { + +class RuntimeFilterProducer : public RuntimeFilter { +public: + static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, + std::shared_ptr* res) { + *res = std::shared_ptr(new RuntimeFilterProducer(state, desc)); + return (*res)->_init_with_desc(desc, &state->get_query_ctx()->query_options()); + } + + // insert data to build filter + void insert_batch(vectorized::ColumnPtr column, size_t start) { + _wrapper->insert_batch(column, start); + } + + int expr_order() const { return _expr_order; } + + bool need_sync_filter_size() { + return _wrapper->get_build_bf_cardinality() && !_is_broadcast_join; + } + + Status init_with_size(size_t local_size); + + Status send_filter_size(RuntimeState* state, uint64_t local_filter_size); + + Status publish(RuntimeState* state, bool publish_local); + + void set_synced_size(uint64_t global_size); + + void set_finish_dependency( + const std::shared_ptr& dependency); + + int64_t get_synced_size() const { + if (_synced_size == -1 || !_dependency) { + throw Exception(doris::ErrorCode::INTERNAL_ERROR, + "sync filter size meet error, filter: {}", debug_string()); + } + return _synced_size; + } + + RuntimeFilterContextSPtr& get_shared_context_ref() { return _wrapper->_context; } + + std::string debug_string() const { + return fmt::format("RuntimeFilterProducer: ({}, dependency: {}, synced_size: {}]", + _debug_string(), _dependency ? _dependency->debug_string() : "none", + _synced_size); + } + +private: + RuntimeFilterProducer(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc) + : RuntimeFilter(state, desc), + _is_broadcast_join(desc->is_broadcast_join), + _expr_order(desc->expr_order) {} + + Status _send_to_remote_targets(RuntimeState* state, RuntimeFilter* filter, + uint64_t local_merge_time); + Status _send_to_local_targets(std::shared_ptr wrapper, bool global, + uint64_t local_merge_time); + + bool _is_broadcast_join; + int _expr_order; + + int64_t _synced_size = -1; + std::shared_ptr _dependency; +}; + +} // namespace doris diff --git a/be/src/exprs/runtime_filter/runtime_filter_slots.h b/be/src/exprs/runtime_filter/runtime_filter_slots.h index ff1d87c5a6f13e..ff23b23b9bc9b9 100644 --- a/be/src/exprs/runtime_filter/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter/runtime_filter_slots.h @@ -19,6 +19,7 @@ #include "common/status.h" #include "exprs/runtime_filter/runtime_filter.h" +#include "exprs/runtime_filter/runtime_filter_producer.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" // IWYU pragma: keep @@ -31,7 +32,7 @@ class RuntimeFilterSlots { public: RuntimeFilterSlots(const vectorized::VExprContextSPtrs& build_expr_ctxs, RuntimeProfile* profile, - const std::vector>& runtime_filters, + const std::vector>& runtime_filters, bool should_build_hash_table) : _build_expr_context(build_expr_ctxs), _runtime_filters(runtime_filters), @@ -96,7 +97,7 @@ class RuntimeFilterSlots { } const std::vector>& _build_expr_context; - std::vector> _runtime_filters; + std::vector> _runtime_filters; bool _should_build_hash_table; RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; diff --git a/be/src/exprs/runtime_filter/runtime_filter_slots_cross.h b/be/src/exprs/runtime_filter/runtime_filter_slots_cross.h index 860ef3a2ae6e31..06473857c964e8 100644 --- a/be/src/exprs/runtime_filter/runtime_filter_slots_cross.h +++ b/be/src/exprs/runtime_filter/runtime_filter_slots_cross.h @@ -32,10 +32,10 @@ namespace doris { // this class used in cross join node class RuntimeFilterSlotsCross : public RuntimeFilterSlots { public: - RuntimeFilterSlotsCross(const vectorized::VExprContextSPtrs& build_expr_ctxs, - RuntimeProfile* profile, - const std::vector>& runtime_filters, - bool should_build_hash_table) + RuntimeFilterSlotsCross( + const vectorized::VExprContextSPtrs& build_expr_ctxs, RuntimeProfile* profile, + const std::vector>& runtime_filters, + bool should_build_hash_table) : RuntimeFilterSlots(build_expr_ctxs, profile, runtime_filters, should_build_hash_table) {} diff --git a/be/src/exprs/runtime_filter/runtime_filter_wrapper.h b/be/src/exprs/runtime_filter/runtime_filter_wrapper.h index 8fd86fe99c9181..dd810f95f63799 100644 --- a/be/src/exprs/runtime_filter/runtime_filter_wrapper.h +++ b/be/src/exprs/runtime_filter/runtime_filter_wrapper.h @@ -20,6 +20,7 @@ #include "common/status.h" #include "exprs/bloom_filter_func.h" #include "exprs/runtime_filter/runtime_filter_definitions.h" +#include "runtime/runtime_state.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/exprs/vruntimefilter_wrapper.h" @@ -115,6 +116,9 @@ class RuntimePredicateWrapper { } friend class RuntimeFilter; + friend class RuntimeFilterProducer; + friend class RuntimeFilterConsumer; + friend class RuntimeFilterMerger; void set_filter_id(int id); @@ -153,6 +157,14 @@ class RuntimePredicateWrapper { } } + std::string debug_string() const { + return fmt::format( + "filter_id: {}, ignored: {}, disabled: {}, build_bf_cardinality: {}, error_msg: " + "[{}]", + _filter_id, _context->ignored, _context->disabled, get_build_bf_cardinality(), + _context->err_msg); + } + private: // When a runtime filter received from remote and it is a bloom filter, _column_return_type will be invalid. PrimitiveType _column_return_type; // column type diff --git a/be/src/pipeline/common/runtime_filter_consumer.cpp b/be/src/pipeline/common/runtime_filter_consumer_operator.cpp similarity index 83% rename from be/src/pipeline/common/runtime_filter_consumer.cpp rename to be/src/pipeline/common/runtime_filter_consumer_operator.cpp index 69497f19251431..433ff567ee4d5b 100644 --- a/be/src/pipeline/common/runtime_filter_consumer.cpp +++ b/be/src/pipeline/common/runtime_filter_consumer_operator.cpp @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include "pipeline/common/runtime_filter_consumer.h" +#include "pipeline/common/runtime_filter_consumer_operator.h" +#include "exprs/runtime_filter/runtime_filter_consumer.h" #include "pipeline/pipeline_task.h" namespace doris::pipeline { -RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, - const std::vector& runtime_filters, - const RowDescriptor& row_descriptor, - vectorized::VExprContextSPtrs& conjuncts) +RuntimeFilterConsumerOperator::RuntimeFilterConsumerOperator( + const int32_t filter_id, const std::vector& runtime_filters, + const RowDescriptor& row_descriptor, vectorized::VExprContextSPtrs& conjuncts) : _filter_id(filter_id), _runtime_filter_descs(runtime_filters), _row_descriptor_ref(row_descriptor), @@ -32,13 +32,13 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, _blocked_by_rf = std::make_shared(false); } -Status RuntimeFilterConsumer::init(RuntimeState* state, bool need_local_merge) { +Status RuntimeFilterConsumerOperator::init(RuntimeState* state, bool need_local_merge) { _state = state; RETURN_IF_ERROR(_register_runtime_filter(need_local_merge)); return Status::OK(); } -void RuntimeFilterConsumer::_init_profile(RuntimeProfile* profile) { +void RuntimeFilterConsumerOperator::_init_profile(RuntimeProfile* profile) { fmt::memory_buffer buffer; for (auto& rf_ctx : _runtime_filter_ctxs) { rf_ctx.runtime_filter->init_profile(profile); @@ -47,22 +47,21 @@ void RuntimeFilterConsumer::_init_profile(RuntimeProfile* profile) { profile->add_info_string("RuntimeFilters: ", to_string(buffer)); } -Status RuntimeFilterConsumer::_register_runtime_filter(bool need_local_merge) { +Status RuntimeFilterConsumerOperator::_register_runtime_filter(bool need_local_merge) { int filter_size = _runtime_filter_descs.size(); _runtime_filter_ctxs.reserve(filter_size); _runtime_filter_ready_flag.reserve(filter_size); for (int i = 0; i < filter_size; ++i) { - std::shared_ptr runtime_filter; - const auto& filter_desc = _runtime_filter_descs[i]; - RETURN_IF_ERROR(_state->register_consumer_runtime_filter(filter_desc, need_local_merge, - _filter_id, &runtime_filter)); - _runtime_filter_ctxs.emplace_back(runtime_filter); + std::shared_ptr filter; + RETURN_IF_ERROR(_state->register_consumer_runtime_filter( + _runtime_filter_descs[i], need_local_merge, _filter_id, &filter)); + _runtime_filter_ctxs.emplace_back(filter); _runtime_filter_ready_flag.emplace_back(false); } return Status::OK(); } -void RuntimeFilterConsumer::init_runtime_filter_dependency( +void RuntimeFilterConsumerOperator::init_runtime_filter_dependency( std::vector>& runtime_filter_dependencies, const int id, const int node_id, const std::string& name) { @@ -98,7 +97,7 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency( } } -Status RuntimeFilterConsumer::_acquire_runtime_filter() { +Status RuntimeFilterConsumerOperator::_acquire_runtime_filter() { SCOPED_TIMER(_acquire_runtime_filter_timer); std::vector vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { @@ -118,7 +117,7 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() { return Status::OK(); } -Status RuntimeFilterConsumer::_append_rf_into_conjuncts( +Status RuntimeFilterConsumerOperator::_append_rf_into_conjuncts( const std::vector& vexprs) { if (vexprs.empty()) { return Status::OK(); @@ -134,7 +133,7 @@ Status RuntimeFilterConsumer::_append_rf_into_conjuncts( return Status::OK(); } -Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { +Status RuntimeFilterConsumerOperator::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { if (_is_all_rf_applied) { *arrived_rf_num = _runtime_filter_descs.size(); return Status::OK(); @@ -175,7 +174,7 @@ Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrive return Status::OK(); } -void RuntimeFilterConsumer::_prepare_rf_timer(RuntimeProfile* profile) { +void RuntimeFilterConsumerOperator::_prepare_rf_timer(RuntimeProfile* profile) { _acquire_runtime_filter_timer = ADD_TIMER(profile, "AcquireRuntimeFilterTime"); } diff --git a/be/src/pipeline/common/runtime_filter_consumer.h b/be/src/pipeline/common/runtime_filter_consumer_operator.h similarity index 82% rename from be/src/pipeline/common/runtime_filter_consumer.h rename to be/src/pipeline/common/runtime_filter_consumer_operator.h index 98472ec8ce58c7..5fbe84c33e53cf 100644 --- a/be/src/pipeline/common/runtime_filter_consumer.h +++ b/be/src/pipeline/common/runtime_filter_consumer_operator.h @@ -19,18 +19,18 @@ #include -#include "exprs/runtime_filter/runtime_filter.h" #include "pipeline/dependency.h" +#include "vec/exprs/vruntimefilter_wrapper.h" namespace doris::pipeline { -class RuntimeFilterConsumer { +class RuntimeFilterConsumerOperator { public: - RuntimeFilterConsumer(const int32_t filter_id, - const std::vector& runtime_filters, - const RowDescriptor& row_descriptor, - vectorized::VExprContextSPtrs& conjuncts); - ~RuntimeFilterConsumer() = default; + RuntimeFilterConsumerOperator(const int32_t filter_id, + const std::vector& runtime_filters, + const RowDescriptor& row_descriptor, + vectorized::VExprContextSPtrs& conjuncts); + ~RuntimeFilterConsumerOperator() = default; Status init(RuntimeState* state, bool need_local_merge = false); @@ -57,10 +57,11 @@ class RuntimeFilterConsumer { // For runtime filters struct RuntimeFilterContext { - RuntimeFilterContext(std::shared_ptr rf) : runtime_filter(std::move(rf)) {} + RuntimeFilterContext(std::shared_ptr rf) + : runtime_filter(std::move(rf)) {} // set to true if this runtime filter is already applied to vconjunct_ctx_ptr bool apply_mark = false; - std::shared_ptr runtime_filter; + std::shared_ptr runtime_filter; }; std::vector _runtime_filter_ctxs; diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 465e2551938015..2ddc2125ee6e37 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -21,7 +21,7 @@ #include #include "common/logging.h" -#include "exprs/runtime_filter/runtime_filter.h" +#include "exprs/runtime_filter/runtime_filter_consumer.h" #include "pipeline/exec/multi_cast_data_streamer.h" #include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_task.h" diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index a8deafbfdd486b..b606185ba0d051 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -273,12 +273,13 @@ struct RuntimeFilterTimerQueue { class RuntimeFilterDependency final : public Dependency { public: - RuntimeFilterDependency(int id, int node_id, std::string name, RuntimeFilter* runtime_filter) + RuntimeFilterDependency(int id, int node_id, std::string name, + RuntimeFilterConsumer* runtime_filter) : Dependency(id, node_id, name), _runtime_filter(runtime_filter) {} std::string debug_string(int indentation_level = 0) override; private: - const RuntimeFilter* _runtime_filter = nullptr; + const RuntimeFilterConsumer* _runtime_filter = nullptr; }; struct AggSharedState : public BasicSharedState { diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 96e111257f6682..616b5ef1829cf4 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -19,7 +19,7 @@ #include -#include "exprs/runtime_filter/runtime_filter.h" +#include "exprs/runtime_filter/runtime_filter_consumer.h" #include "pipeline/common/data_gen_functions/vdata_gen_function_inf.h" #include "pipeline/common/data_gen_functions/vnumbers_tvf.h" #include "pipeline/exec/operator.h" @@ -96,10 +96,10 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { // TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node. for (const auto& filter_desc : p._runtime_filter_descs) { - std::shared_ptr runtime_filter; + std::shared_ptr filter; RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, p.is_serial_operator(), - p.node_id(), &runtime_filter)); - runtime_filter->init_profile(_runtime_profile.get()); + p.node_id(), &filter)); + filter->init_profile(_runtime_profile.get()); } return Status::OK(); } diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index 2cb3cd5e0b29ce..c269610728d5d4 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -79,11 +79,11 @@ Status EsScanLocalState::_init_scanners(std::list* sca } properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range->shard_id); properties[ESScanReader::KEY_BATCH_SIZE] = - std::to_string(RuntimeFilterConsumer::_state->batch_size()); + std::to_string(RuntimeFilterConsumerOperator::_state->batch_size()); properties[ESScanReader::KEY_HOST_PORT] = get_host_and_port(es_scan_range->es_hosts); // push down limit to Elasticsearch // if predicate in _conjuncts can not be processed by Elasticsearch, we can not push down limit operator to Elasticsearch - if (p.limit() != -1 && p.limit() <= RuntimeFilterConsumer::_state->batch_size() && + if (p.limit() != -1 && p.limit() <= RuntimeFilterConsumerOperator::_state->batch_size() && p.conjuncts().empty()) { properties[ESScanReader::KEY_TERMINATE_AFTER] = std::to_string(p.limit()); } @@ -93,11 +93,11 @@ Status EsScanLocalState::_init_scanners(std::list* sca properties, p._column_names, p._docvalue_context, &doc_value_mode); std::shared_ptr scanner = vectorized::NewEsScanner::create_shared( - RuntimeFilterConsumer::_state, this, p._limit, p._tuple_id, properties, + RuntimeFilterConsumerOperator::_state, this, p._limit, p._tuple_id, properties, p._docvalue_context, doc_value_mode, - RuntimeFilterConsumer::_state->runtime_profile()); + RuntimeFilterConsumerOperator::_state->runtime_profile()); - RETURN_IF_ERROR(scanner->prepare(RuntimeFilterConsumer::_state, Base::_conjuncts)); + RETURN_IF_ERROR(scanner->prepare(RuntimeFilterConsumerOperator::_state, Base::_conjuncts)); scanners->push_back(scanner); } diff --git a/be/src/pipeline/exec/join_build_sink_operator.h b/be/src/pipeline/exec/join_build_sink_operator.h index 4ed67790278d18..69a6d7073c7442 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.h +++ b/be/src/pipeline/exec/join_build_sink_operator.h @@ -29,7 +29,7 @@ class JoinBuildSinkLocalState : public PipelineXSinkLocalState public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - const std::vector>& runtime_filters() const { + const std::vector>& runtime_filters() const { return _runtime_filters; } @@ -40,7 +40,7 @@ class JoinBuildSinkLocalState : public PipelineXSinkLocalState template friend class JoinBuildSinkOperatorX; - std::vector> _runtime_filters; + std::vector> _runtime_filters; }; template diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index e45e59d17e27b3..cc90ac74609218 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -28,13 +28,13 @@ namespace doris::pipeline { MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent), - RuntimeFilterConsumer(static_cast(parent)->dest_id_from_sink(), - parent->runtime_filter_descs(), - static_cast(parent)->_row_desc(), _conjuncts) {} + RuntimeFilterConsumerOperator(static_cast(parent)->dest_id_from_sink(), + parent->runtime_filter_descs(), + static_cast(parent)->_row_desc(), _conjuncts) {} Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); + RETURN_IF_ERROR(RuntimeFilterConsumerOperator::init(state)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); auto& p = _parent->cast(); @@ -44,7 +44,7 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState _get_data_timer = ADD_TIMER(_runtime_profile, "GetDataTime"); _materialize_data_timer = ADD_TIMER(_runtime_profile, "MaterializeDataTime"); // init profile for runtime filter - RuntimeFilterConsumer::_init_profile(profile()); + RuntimeFilterConsumerOperator::_init_profile(profile()); init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(), p.get_name() + "_FILTER_DEPENDENCY"); return Status::OK(); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 57410bf8d9568a..91b19ef023dbd1 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -23,7 +23,7 @@ #include "common/status.h" #include "operator.h" -#include "pipeline/common/runtime_filter_consumer.h" +#include "pipeline/common/runtime_filter_consumer_operator.h" namespace doris { class RuntimeState; @@ -38,7 +38,7 @@ class MultiCastDataStreamer; class MultiCastDataStreamerSourceOperatorX; class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState, - public RuntimeFilterConsumer { + public RuntimeFilterConsumerOperator { public: ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); using Base = PipelineXLocalState; @@ -91,7 +91,6 @@ class MultiCastDataStreamerSourceOperatorX final Status open(RuntimeState* state) override { RETURN_IF_ERROR(Base::open(state)); // init profile for runtime filter - // RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile()); if (_t_data_stream_sink.__isset.output_exprs) { RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs, _output_expr_contexts)); diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index b815f85c0e4614..86531f8f7ffdbd 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -29,7 +29,7 @@ #include "olap/parallel_scanner_builder.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" -#include "pipeline/common/runtime_filter_consumer.h" +#include "pipeline/common/runtime_filter_consumer_operator.h" #include "pipeline/exec/scan_operator.h" #include "pipeline/query_cache/query_cache.h" #include "service/backend_options.h" @@ -285,7 +285,7 @@ Status OlapScanLocalState::_init_scanners(std::list* s } SCOPED_TIMER(_scanner_init_timer); - if (!_conjuncts.empty() && RuntimeFilterConsumer::_state->enable_profile()) { + if (!_conjuncts.empty() && RuntimeFilterConsumerOperator::_state->enable_profile()) { std::string message; for (auto& conjunct : _conjuncts) { if (conjunct->root()) { @@ -445,7 +445,7 @@ Status OlapScanLocalState::hold_tablets() { for (size_t i = 0; i < _scan_ranges.size(); i++) { RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers( {0, _tablets[i].version}, &_read_sources[i].rs_splits, - RuntimeFilterConsumer::_state->skip_missing_version())); + RuntimeFilterConsumerOperator::_state->skip_missing_version())); if (!PipelineXLocalState<>::_state->skip_delete_predicate()) { _read_sources[i].fill_delete_predicates(); } diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 439ae4caedfe72..51ea68ac6c84cd 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -22,7 +22,7 @@ #include #include -#include "pipeline/common/runtime_filter_consumer.h" +#include "pipeline/common/runtime_filter_consumer_operator.h" #include "pipeline/exec/es_scan_operator.h" #include "pipeline/exec/file_scan_operator.h" #include "pipeline/exec/group_commit_scan_operator.h" @@ -73,9 +73,9 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); auto& p = _parent->cast(); - RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, p.is_serial_operator())); + RETURN_IF_ERROR(RuntimeFilterConsumerOperator::init(state, p.is_serial_operator())); // init profile for runtime filter - RuntimeFilterConsumer::_init_profile(profile()); + RuntimeFilterConsumerOperator::_init_profile(profile()); init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(), p.get_name() + "_FILTER_DEPENDENCY"); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index adad5de9ba1aa0..f418e24b899e9e 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -25,7 +25,7 @@ #include "common/status.h" #include "exprs/function_filter.h" #include "operator.h" -#include "pipeline/common/runtime_filter_consumer.h" +#include "pipeline/common/runtime_filter_consumer_operator.h" #include "pipeline/dependency.h" #include "runtime/descriptors.h" #include "runtime/types.h" @@ -62,12 +62,12 @@ struct FilterPredicates { std::vector>> in_filters; }; -class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterConsumer { +class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterConsumerOperator { public: ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState<>(state, parent), - RuntimeFilterConsumer(parent->node_id(), parent->runtime_filter_descs(), - parent->row_descriptor(), _conjuncts) {} + RuntimeFilterConsumerOperator(parent->node_id(), parent->runtime_filter_descs(), + parent->row_descriptor(), _conjuncts) {} ~ScanLocalStateBase() override = default; [[nodiscard]] virtual bool should_run_serial() const = 0; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1e8d94fd983477..63dbd95afb2aa9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -19,7 +19,7 @@ #include #include -#include +#include #include #include #include @@ -1315,8 +1315,8 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, SCOPED_ATTACH_TASK(pip_context->get_query_ctx()); // 1. get the target filters - std::vector> filters; - RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), filters)); + std::vector> filters = + runtime_filter_mgr->get_consume_filters(request->filter_id()); // 2. create the filter wrapper to replace or ignore the target filters if (!filters.empty()) { diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 32c9c253462fbe..38a94e331f25b3 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -33,7 +33,9 @@ #include "common/logging.h" #include "common/status.h" #include "exprs/runtime_filter/runtime_filter.h" -#include "exprs/runtime_filter/runtime_filter_wrapper.h" +#include "exprs/runtime_filter/runtime_filter_consumer.h" +#include "exprs/runtime_filter/runtime_filter_merger.h" +#include "exprs/runtime_filter/runtime_filter_producer.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "runtime/query_context.h" @@ -58,48 +60,28 @@ RuntimeFilterMgr::~RuntimeFilterMgr() { _pool.clear(); } -Status RuntimeFilterMgr::get_consume_filters( - const int filter_id, std::vector>& consumer_filters) { - DCHECK(_is_global); - std::lock_guard l(_lock); - auto iter = _consumer_map.find(filter_id); - if (iter == _consumer_map.end()) { - return Status::InternalError("get_consume_filters meet unknown filter: {}, role: CONSUMER.", - filter_id); - } - for (auto& holder : iter->second) { - consumer_filters.emplace_back(holder.filter); - } - return Status::OK(); -} - -std::vector> RuntimeFilterMgr::get_consume_filters( - const int filter_id) { +std::vector> RuntimeFilterMgr::get_consume_filters( + int filter_id) { std::lock_guard l(_lock); auto iter = _consumer_map.find(filter_id); if (iter == _consumer_map.end()) { return {}; } - std::vector> consumer_filters; - for (auto& holder : iter->second) { - consumer_filters.emplace_back(holder.filter); - } - return consumer_filters; + return iter->second; } -Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, - const TQueryOptions& options, int node_id, - std::shared_ptr* consumer_filter, - bool need_local_merge) { +Status RuntimeFilterMgr::register_consumer_filter( + const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, + std::shared_ptr* consumer_filter, bool need_local_merge) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; bool has_exist = false; std::lock_guard l(_lock); if (auto iter = _consumer_map.find(key); iter != _consumer_map.end()) { - for (auto holder : iter->second) { - if (holder.node_id == node_id) { - *consumer_filter = holder.filter; + for (auto filter : iter->second) { + if (filter->node_id() == node_id) { + *consumer_filter = filter; has_exist = true; } } @@ -108,10 +90,9 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc DCHECK(!(_is_global xor need_local_merge)) << " _is_global: " << _is_global << " need_local_merge: " << need_local_merge; if (!has_exist) { - std::shared_ptr filter; - RETURN_IF_ERROR(RuntimeFilter::create(_state, &desc, RuntimeFilterRole::CONSUMER, node_id, - &filter)); - _consumer_map[key].emplace_back(node_id, filter); + std::shared_ptr filter; + RETURN_IF_ERROR(RuntimeFilterConsumer::create(_state, &desc, node_id, &filter)); + _consumer_map[key].push_back(filter); *consumer_filter = filter; } else if (!need_local_merge) { return Status::InvalidArgument("filter has registered"); @@ -120,9 +101,9 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc return Status::OK(); } -Status RuntimeFilterMgr::register_local_merge_producer_filter( +Status RuntimeFilterMgr::register_local_merger_filter( const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options, - std::shared_ptr producer_filter) { + std::shared_ptr producer_filter) { DCHECK(_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -140,16 +121,15 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter( DCHECK(_state != nullptr); { std::lock_guard l(*iter->second.lock); - if (iter->second.filters.empty()) { - std::shared_ptr merge_filter; - RETURN_IF_ERROR(RuntimeFilter::create(_state, &desc, RuntimeFilterRole::MERGER, -1, - &merge_filter)); + if (!iter->second.merger) { + std::shared_ptr merge_filter; + RETURN_IF_ERROR(RuntimeFilterMerger::create(_state, &desc, &merge_filter)); merge_filter->set_ignored(); - iter->second.filters.emplace_back(merge_filter); + iter->second.merger = merge_filter; } iter->second.merge_time++; iter->second.merge_size_times++; - iter->second.filters.emplace_back(producer_filter); + iter->second.producers.emplace_back(producer_filter); } return Status::OK(); } @@ -166,13 +146,13 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters( filter_id); } *local_merge_filters = &iter->second; - DCHECK(!iter->second.filters.empty()); + DCHECK(iter->second.merger); return Status::OK(); } -Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc, - const TQueryOptions& options, - std::shared_ptr* producer_filter) { +Status RuntimeFilterMgr::register_producer_filter( + const TRuntimeFilterDesc& desc, const TQueryOptions& options, + std::shared_ptr* producer_filter) { DCHECK(!_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -183,8 +163,7 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc if (iter != _producer_map.end()) { return Status::InvalidArgument("filter has registed"); } - RETURN_IF_ERROR( - RuntimeFilter::create(_state, &desc, RuntimeFilterRole::PRODUCER, -1, producer_filter)); + RETURN_IF_ERROR(RuntimeFilterProducer::create(_state, &desc, producer_filter)); _producer_map.emplace(key, *producer_filter); return Status::OK(); } @@ -217,8 +196,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cnt_val->producer_size = producer_size; cnt_val->runtime_filter_desc = *runtime_filter_desc; cnt_val->targetv2_info = targetv2_info; - RETURN_IF_ERROR(RuntimeFilter::create(_state, runtime_filter_desc, RuntimeFilterRole::MERGER, - -1, &cnt_val->filter)); + RETURN_IF_ERROR(RuntimeFilterMerger::create(_state, runtime_filter_desc, &cnt_val->filter)); auto filter_id = runtime_filter_desc->filter_id; cnt_val->filter->set_ignored(); @@ -318,9 +296,8 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::weak_ptrfilter_id(), &local_merge_filters)); - // first filter size merged filter - for (size_t i = 1; i < local_merge_filters->filters.size(); i++) { - local_merge_filters->filters[i]->set_synced_size(request->filter_size()); + for (auto producer : local_merge_filters->producers) { + producer->set_synced_size(request->filter_size()); } return Status::OK(); } @@ -353,9 +330,9 @@ Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr que if (cnt_val->arrive_id.size() == 1 && cnt_val->runtime_filter_desc.is_broadcast_join) { return Status::OK(); } - std::shared_ptr tmp_filter; - RETURN_IF_ERROR(RuntimeFilter::create(_state, &cnt_val->runtime_filter_desc, - RuntimeFilterRole::MERGER, -1, &tmp_filter)); + std::shared_ptr tmp_filter; + RETURN_IF_ERROR( + RuntimeFilterMerger::create(_state, &cnt_val->runtime_filter_desc, &tmp_filter)); RETURN_IF_ERROR(tmp_filter->assign_data_into_wrapper(*request, attach_data)); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index c0d9d3e378dc61..00d40c9d2c3adc 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -44,7 +44,9 @@ class IOBufAsZeroCopyInputStream; namespace doris { class PPublishFilterRequestV2; class PMergeFilterRequest; -class RuntimeFilter; +class RuntimeFilterMerger; +class RuntimeFilterProducer; +class RuntimeFilterConsumer; class MemTracker; class MemTrackerLimiter; class RuntimeState; @@ -59,7 +61,8 @@ struct LocalMergeFilters { int merge_time = 0; int merge_size_times = 0; uint64_t local_merged_size = 0; - std::vector> filters; + std::shared_ptr merger; + std::vector> producers; MonotonicStopWatch merge_watcher; }; @@ -83,11 +86,9 @@ class RuntimeFilterMgr { ~RuntimeFilterMgr(); - Status get_consume_filters(const int filter_id, - std::vector>& consumer_filters); - std::vector> get_consume_filters(const int filter_id); + std::vector> get_consume_filters(int filter_id); - std::shared_ptr try_get_product_filter(const int filter_id) { + std::shared_ptr try_get_product_filter(const int filter_id) { std::lock_guard l(_lock); auto iter = _producer_map.find(filter_id); if (iter == _producer_map.end()) { @@ -98,17 +99,18 @@ class RuntimeFilterMgr { // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - int node_id, std::shared_ptr* consumer_filter, + int node_id, + std::shared_ptr* consumer_filter, bool need_local_merge = false); - Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc, - const TQueryOptions& options, - std::shared_ptr producer_filter); + Status register_local_merger_filter(const TRuntimeFilterDesc& desc, + const TQueryOptions& options, + std::shared_ptr producer_filter); Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters); Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - std::shared_ptr* producer_filter); + std::shared_ptr* producer_filter); // update filter by remote void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params); @@ -118,10 +120,6 @@ class RuntimeFilterMgr { Status sync_filter_size(const PSyncFilterSizeRequest* request); private: - struct ConsumerFilterHolder { - int node_id; - std::shared_ptr filter; - }; /** * `_is_global = true` means this runtime filter manager menages query-level runtime filters. * If so, all consumers in this query shared the same RF with the same ID. For producers, all @@ -137,8 +135,8 @@ class RuntimeFilterMgr { // RuntimeFilterMgr is owned by RuntimeState, so we only // use filter_id as key // key: "filter-id" - std::map> _consumer_map; - std::map> _producer_map; + std::map>> _consumer_map; + std::map> _producer_map; std::map _local_merge_producer_map; RuntimeFilterParamsContext* _state = nullptr; @@ -180,7 +178,7 @@ class RuntimeFilterMergeControllerEntity { uint64_t global_size; TRuntimeFilterDesc runtime_filter_desc; std::vector targetv2_info; - std::shared_ptr filter = nullptr; + std::shared_ptr filter = nullptr; std::unordered_set arrive_id; std::vector source_addrs; uint64_t local_merge_time = 0; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index f431bca135a2d2..ee665a4cb3a684 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -490,19 +490,19 @@ RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() { } Status RuntimeState::register_producer_runtime_filter( - const TRuntimeFilterDesc& desc, std::shared_ptr* producer_filter) { + const TRuntimeFilterDesc& desc, std::shared_ptr* producer_filter) { // Producers are created by local runtime filter mgr and shared by global runtime filter manager. // When RF is published, consumers in both global and local RF mgr will be found. RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc, query_options(), producer_filter)); - RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter( - desc, query_options(), *producer_filter)); + RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_filter(desc, query_options(), + *producer_filter)); return Status::OK(); } Status RuntimeState::register_consumer_runtime_filter( const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int node_id, - std::shared_ptr* consumer_filter) { + std::shared_ptr* consumer_filter) { if (desc.has_remote_targets || need_local_merge) { return global_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id, consumer_filter, true); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 448cc624254867..1c16f4167112e5 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -70,6 +70,8 @@ class ExecEnv; class RuntimeFilterMgr; class MemTrackerLimiter; class QueryContext; +class RuntimeFilterConsumer; +class RuntimeFilterProducer; // A collection of items that are part of the global state of a // query and shared across all execution nodes of that query. @@ -545,12 +547,13 @@ class RuntimeState { return _task_execution_context; } - Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& desc, - std::shared_ptr* producer_filter); + Status register_producer_runtime_filter( + const doris::TRuntimeFilterDesc& desc, + std::shared_ptr* producer_filter); - Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& desc, - bool need_local_merge, int node_id, - std::shared_ptr* producer_filter); + Status register_consumer_runtime_filter( + const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int node_id, + std::shared_ptr* consumer_filter); bool is_nereids() const; bool enable_join_spill() const {