From 8e38763572a2c58ec8c65406ec7e665a3b153d09 Mon Sep 17 00:00:00 2001 From: holmes1412 Date: Tue, 30 Jan 2024 21:10:35 +0800 Subject: [PATCH] RPCModule : 1. fix : add lock and flag when RPCFilterPolicy report(); 2. feat : refactor the usage of RPCMetricsFilter::Collector; 3. feat : add filter_name and differentiate the var reported by each filter; 4. fix : add lock for SummaryVar::observe(); --- src/module/rpc_filter.h | 21 +++ src/module/rpc_metrics_filter.cc | 207 ++++++++++++++++++-------- src/module/rpc_metrics_filter.h | 67 +++++---- src/var/rpc_var.cc | 17 +++ src/var/rpc_var.h | 19 +++ tools/templates/common/config.json | 1 + tools/templates/config/config_full.cc | 4 +- 7 files changed, 245 insertions(+), 91 deletions(-) diff --git a/src/module/rpc_filter.h b/src/module/rpc_filter.h index e0b71a94..32a2ee82 100644 --- a/src/module/rpc_filter.h +++ b/src/module/rpc_filter.h @@ -70,9 +70,20 @@ class RPCFilter this->module_type = module_type; } + RPCFilter(const std::string name, enum RPCModuleType module_type) + { + size_t pos = name.find("::"); + if (pos != std::string::npos) + this->filter_name = name.substr(0, pos) + "::"; + else + this->filter_name = name + "::"; + this->module_type = module_type; + } + virtual ~RPCFilter() { } enum RPCModuleType get_module_type() const { return this->module_type; } + const std::string& get_name() const { return this->filter_name; } virtual bool client_begin(SubTask *task, RPCModuleData& data) { @@ -91,8 +102,18 @@ class RPCFilter return true; } + const std::string raw_var_name(const std::string& name) const + { + size_t pos = name.find("::"); + if (pos != std::string::npos) + return name.substr(pos + 2); + else + return name; + } + private: enum RPCModuleType module_type; + std::string filter_name; }; } // end namespace srpc diff --git a/src/module/rpc_metrics_filter.cc b/src/module/rpc_metrics_filter.cc index a72f5739..da4903c8 100644 --- a/src/module/rpc_metrics_filter.cc +++ b/src/module/rpc_metrics_filter.cc @@ -61,6 +61,15 @@ RPCMetricsFilter::RPCMetricsFilter() : {{0.5, 0.05}, {0.9, 0.01}}); } +RPCMetricsFilter::RPCMetricsFilter(const std::string &name) : + RPCFilter(RPCModuleTypeMetrics) +{ + this->create_gauge(METRICS_REQUEST_COUNT, "total request count"); + this->create_counter(METRICS_REQUEST_METHOD, "request method statistics"); + this->create_summary(METRICS_REQUEST_LATENCY, "request latency nano seconds", + {{0.5, 0.05}, {0.9, 0.01}}); +} + bool RPCMetricsFilter::client_end(SubTask *task, RPCModuleData& data) { this->gauge(METRICS_REQUEST_COUNT)->increase(); @@ -85,33 +94,38 @@ bool RPCMetricsFilter::server_end(SubTask *task, RPCModuleData& data) GaugeVar *RPCMetricsFilter::gauge(const std::string& name) { + std::string var_name = this->get_name() + name; return RPCVarFactory::gauge(name); } CounterVar *RPCMetricsFilter::counter(const std::string& name) { + std::string var_name = this->get_name() + name; return RPCVarFactory::counter(name); } HistogramVar *RPCMetricsFilter::histogram(const std::string& name) { + std::string var_name = this->get_name() + name; return RPCVarFactory::histogram(name); } SummaryVar *RPCMetricsFilter::summary(const std::string& name) { + std::string var_name = this->get_name() + name; return RPCVarFactory::summary(name); } -GaugeVar *RPCMetricsFilter::create_gauge(const std::string& name, +GaugeVar *RPCMetricsFilter::create_gauge(const std::string& str, const std::string& help) { - if (RPCVarFactory::check_name_format(name) == false) + if (RPCVarFactory::check_name_format(str) == false) { errno = EINVAL; return NULL; } + std::string name = this->get_name() + str; this->mutex.lock(); const auto it = var_names.insert(name); this->mutex.unlock(); @@ -127,15 +141,16 @@ GaugeVar *RPCMetricsFilter::create_gauge(const std::string& name, return gauge; } -CounterVar *RPCMetricsFilter::create_counter(const std::string& name, +CounterVar *RPCMetricsFilter::create_counter(const std::string& str, const std::string& help) { - if (RPCVarFactory::check_name_format(name) == false) + if (RPCVarFactory::check_name_format(str) == false) { errno = EINVAL; return NULL; } + std::string name = this->get_name() + str; this->mutex.lock(); const auto it = var_names.insert(name); this->mutex.unlock(); @@ -151,16 +166,17 @@ CounterVar *RPCMetricsFilter::create_counter(const std::string& name, return counter; } -HistogramVar *RPCMetricsFilter::create_histogram(const std::string& name, +HistogramVar *RPCMetricsFilter::create_histogram(const std::string& str, const std::string& help, const std::vector& bucket) { - if (RPCVarFactory::check_name_format(name) == false) + if (RPCVarFactory::check_name_format(str) == false) { errno = EINVAL; return NULL; } + std::string name = this->get_name() + str; this->mutex.lock(); const auto it = var_names.insert(name); this->mutex.unlock(); @@ -176,16 +192,17 @@ HistogramVar *RPCMetricsFilter::create_histogram(const std::string& name, return histogram; } -SummaryVar *RPCMetricsFilter::create_summary(const std::string& name, +SummaryVar *RPCMetricsFilter::create_summary(const std::string& str, const std::string& help, const std::vector& quantile) { - if (RPCVarFactory::check_name_format(name) == false) + if (RPCVarFactory::check_name_format(str) == false) { errno = EINVAL; return NULL; } + std::string name = this->get_name() + str; this->mutex.lock(); const auto it = var_names.insert(name); this->mutex.unlock(); @@ -204,18 +221,19 @@ SummaryVar *RPCMetricsFilter::create_summary(const std::string& name, return summary; } -SummaryVar *RPCMetricsFilter::create_summary(const std::string& name, +SummaryVar *RPCMetricsFilter::create_summary(const std::string& str, const std::string& help, const std::vector& quantile, const std::chrono::milliseconds max_age, int age_bucket) { - if (RPCVarFactory::check_name_format(name) == false) + if (RPCVarFactory::check_name_format(str) == false) { errno = EINVAL; return NULL; } + std::string name = this->get_name() + str; this->mutex.lock(); const auto it = var_names.insert(name); this->mutex.unlock(); @@ -297,7 +315,7 @@ bool RPCMetricsPull::expose() RPCVar *var = it->second; this->report_output += "# HELP " + var->get_name() + " " + var->get_help() + "\n# TYPE " + var->get_name() + - " " + var->get_type_str() + "\n"; + " " + var->get_type_str() + "\n"; output.clear(); var->collect(&this->collector); this->report_output += output; @@ -373,27 +391,44 @@ void RPCMetricsPull::Collector::collect_summary_end(RPCVar *summary, bool RPCFilterPolicy::report(size_t count) { + bool ret = false; long long timestamp = GET_CURRENT_MS(); if (this->last_report_timestamp == 0) this->last_report_timestamp = timestamp; - if (timestamp > this->last_report_timestamp + - (long long)this->report_interval || - count >= this->report_threshold) + if ((timestamp > this->last_report_timestamp + + (long long)this->report_interval || + count >= this->report_threshold) && + this->reporting == false) { - this->last_report_timestamp = timestamp; - return true; + this->mutex.lock(); + if (this->reporting == false) + { + this->reporting = true; + this->last_report_timestamp = timestamp; + ret = true; + } + this->mutex.unlock(); } - return false; + return ret; } -RPCMetricsOTel::RPCMetricsOTel(const std::string& url, +void RPCFilterPolicy::set_reporting(bool flag) +{ + this->mutex.lock(); + this->reporting = flag; + this->mutex.unlock(); +} + +RPCMetricsOTel::RPCMetricsOTel(const std::string &name, + const std::string& url, unsigned int redirect_max, unsigned int retry_max, size_t report_threshold, size_t report_interval_msec) : + RPCMetricsFilter(name), url(url + OTLP_METRICS_PATH), redirect_max(redirect_max), retry_max(retry_max), @@ -402,7 +437,9 @@ RPCMetricsOTel::RPCMetricsOTel(const std::string& url, { } -RPCMetricsOTel::RPCMetricsOTel(const std::string& url) : +RPCMetricsOTel::RPCMetricsOTel(const std::string &name, + const std::string& url) : + RPCMetricsFilter(name), url(url + OTLP_METRICS_PATH), redirect_max(OTLP_HTTP_REDIRECT_MAX), retry_max(OTLP_HTTP_RETRY_MAX), @@ -447,6 +484,8 @@ SubTask *RPCMetricsOTel::create(RPCModuleData& data) KeyValue *attribute; AnyValue *value; + this->report_counts = 0; // this is not very strict but acceptable + InstrumentationScope *scope = metrics->mutable_scope(); scope->set_name(this->scope_name); @@ -483,7 +522,7 @@ SubTask *RPCMetricsOTel::create(RPCModuleData& data) // fprintf(stderr, "[Metrics info to report]\n%s\n", req.DebugString().c_str()); req.SerializeToString(output); - this->report_counts = 0; + this->policy.set_reporting(false); WFHttpTask *task = WFTaskFactory::create_http_task(this->url, this->redirect_max, @@ -522,13 +561,15 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg) this->reduce(tmp); + this->collector.set_current_nano(GET_CURRENT_NS()); + for (it = tmp.begin(); it != tmp.end(); it++) { RPCVar *var = it->second; Metric *m = metrics->add_metrics(); google::protobuf::Message *current_var; - m->set_name(var->get_name()); + m->set_name(this->raw_var_name(var->get_name())); m->set_description(var->get_help()); switch(var->get_type()) @@ -536,21 +577,21 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg) default: case VAR_GAUGE: current_var = m->mutable_gauge(); + this->collector.collect_gauge(var, current_var); break; case VAR_COUNTER: current_var = m->mutable_sum(); + this->collector.collect_counter(var, current_var); break; case VAR_HISTOGRAM: current_var = m->mutable_histogram(); + this->collector.collect_histogram(var, current_var); break; case VAR_SUMMARY: current_var = m->mutable_summary(); + this->collector.collect_summary(var, current_var); break; } - - this->collector.set_current_message(current_var); - this->collector.set_current_nano(GET_CURRENT_NS()); - var->collect(&this->collector); } for (it = tmp.begin(); it != tmp.end(); it++) @@ -559,14 +600,31 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg) return true; } -void RPCMetricsOTel::Collector::collect_gauge(RPCVar *gauge, double data) +void RPCMetricsOTel::Collector::collect_gauge(RPCVar *var, + google::protobuf::Message *msg) { - Gauge *report_gauge = static_cast(this->current_msg); + GaugeVar *gauge = (GaugeVar *)var; + Gauge *report_gauge = static_cast(msg); + + double data = gauge->get(); NumberDataPoint *data_points = report_gauge->add_data_points(); data_points->set_as_double(data); data_points->set_time_unix_nano(this->current_timestamp_nano); } +void RPCMetricsOTel::Collector::collect_counter(RPCVar *var, + google::protobuf::Message *msg) +{ + CounterVar *counter = (CounterVar *)var; + + std::unordered_map *data; + data = (std::unordered_map *)counter->get_data(); + + for (auto it = data->begin(); it != data->end(); it++) + this->collect_counter_each(it->first, it->second->get(), msg); +} + + void RPCMetricsOTel::Collector::add_counter_label(const std::string& label) { const char *key; @@ -597,11 +655,11 @@ void RPCMetricsOTel::Collector::add_counter_label(const std::string& label) this->label_map.emplace(label, m); } -void RPCMetricsOTel::Collector::collect_counter_each(RPCVar *counter, - const std::string& label, - double data) +void RPCMetricsOTel::Collector::collect_counter_each(const std::string& label, + double data, + google::protobuf::Message *msg) { - Sum *report_sum = static_cast(this->current_msg); + Sum *report_sum = static_cast(msg); NumberDataPoint *data_points = report_sum->add_data_points(); std::map::iterator it = this->label_map.find(label); std::string key; @@ -625,60 +683,85 @@ void RPCMetricsOTel::Collector::collect_counter_each(RPCVar *counter, data_points->set_time_unix_nano(this->current_timestamp_nano); } -void RPCMetricsOTel::Collector::collect_histogram_begin(RPCVar *histogram) +void RPCMetricsOTel::Collector::collect_histogram(RPCVar *var, + google::protobuf::Message *msg) { - Histogram *report_histogram = static_cast(this->current_msg); + HistogramVar *histogram = (HistogramVar *)var; + Histogram *report_histogram = static_cast(msg); + + const std::vector *bucket_counts = histogram->get_bucket_counts(); + const std::vector *bucket_boundaries = histogram->get_bucket_boundaries(); + HistogramDataPoint *data_points = report_histogram->add_data_points(); - this->current_msg = data_points; + + // begin data_points->set_time_unix_nano(this->current_timestamp_nano); + + size_t i = 0; + size_t current = 0; + + for (; i < bucket_boundaries->size(); i++) + { + // current += this->bucket_counts[i]; + current = bucket_counts->at(i); + + this->collect_histogram_each(bucket_boundaries->at(i), current, data_points); + } + + // current += this->bucket_counts[i]; + current = bucket_counts->at(i); + this->collect_histogram_each(DBL_MAX, current, data_points); + + // end + data_points->set_sum(histogram->get_sum()); + data_points->set_count(histogram->get_count()); } -void RPCMetricsOTel::Collector::collect_histogram_each(RPCVar *histogram, - double bucket_boundary, - size_t current_count) +void RPCMetricsOTel::Collector::collect_histogram_each(double bucket_boundary, + size_t current_count, + google::protobuf::Message *msg) { - HistogramDataPoint *data_points = static_cast(this->current_msg); + HistogramDataPoint *data_points = static_cast(msg); data_points->add_bucket_counts(current_count); if (bucket_boundary != DBL_MAX) data_points->add_explicit_bounds(bucket_boundary); } -void RPCMetricsOTel::Collector::collect_histogram_end(RPCVar *histogram, - double sum, - size_t count) +void RPCMetricsOTel::Collector::collect_summary(RPCVar *var, + google::protobuf::Message *msg) { - HistogramDataPoint *data_points = static_cast(this->current_msg); - data_points->set_sum(sum); - data_points->set_count(count); -} + SummaryVar *summary = (SummaryVar *)var; + Summary *report_summary = static_cast(msg); + + const std::vector& quantiles = summary->get_quantiles(); + const std::vector& quantile_out = summary->get_quantile_out(); -void RPCMetricsOTel::Collector::collect_summary_begin(RPCVar *summary) -{ - Summary *report_summary = static_cast(this->current_msg); SummaryDataPoint *data_points = report_summary->add_data_points(); - this->current_msg = data_points; + + // begin data_points->set_time_unix_nano(this->current_timestamp_nano); + for (size_t i = 0; i < quantiles.size(); i++) // equavalant to summary->get_size() + { + this->collect_summary_each(quantiles[i].quantile, quantile_out[i], + data_points); + } + + // end + data_points->set_sum(summary->get_sum()); + data_points->set_count(summary->get_count()); + summary->clear_quantile_out(); } -void RPCMetricsOTel::Collector::collect_summary_each(RPCVar *summary, - double quantile, - double quantile_out) +void RPCMetricsOTel::Collector::collect_summary_each(double quantile, + double quantile_out, + google::protobuf::Message *msg) { - SummaryDataPoint *data_points = static_cast(this->current_msg); + SummaryDataPoint *data_points = static_cast(msg); SummaryDataPoint::ValueAtQuantile *vaq = data_points->add_quantile_values(); vaq->set_quantile(quantile); vaq->set_value(quantile_out); } -void RPCMetricsOTel::Collector::collect_summary_end(RPCVar *summary, - double sum, - size_t count) -{ - SummaryDataPoint *data_points = static_cast(this->current_msg); - data_points->set_sum(sum); - data_points->set_count(count); -} - } // end namespace srpc diff --git a/src/module/rpc_metrics_filter.h b/src/module/rpc_metrics_filter.h index e6b825d1..b9c2ae94 100644 --- a/src/module/rpc_metrics_filter.h +++ b/src/module/rpc_metrics_filter.h @@ -69,6 +69,7 @@ class RPCMetricsFilter : public RPCFilter public: RPCMetricsFilter(); + RPCMetricsFilter(const std::string &name); protected: void reduce(std::unordered_map& out); @@ -141,7 +142,8 @@ class RPCFilterPolicy size_t report_interval_msec) : report_threshold(report_threshold), report_interval(report_interval_msec), - last_report_timestamp(0) + last_report_timestamp(0), + reporting(false) { } void set_report_threshold(size_t threshold) @@ -159,11 +161,14 @@ class RPCFilterPolicy } bool report(size_t count); + void set_reporting(bool flag); private: size_t report_threshold; // metrics to report at most size_t report_interval; long long last_report_timestamp; + bool reporting; + std::mutex mutex; }; class RPCMetricsOTel : public RPCMetricsFilter @@ -186,11 +191,11 @@ class RPCMetricsOTel : public RPCMetricsFilter void set_scope_name(const std::string& name) { this->scope_name = name; } public: - RPCMetricsOTel(const std::string& url); + RPCMetricsOTel(const std::string &name, const std::string &url); - RPCMetricsOTel(const std::string& url, unsigned int redirect_max, - unsigned int retry_max, size_t report_threshold, - size_t report_interval); + RPCMetricsOTel(const std::string &name, const std::string& url, + unsigned int redirect_max, unsigned int retry_max, + size_t report_threshold, size_t report_interval); private: SubTask *create(RPCModuleData& data) override; @@ -206,40 +211,44 @@ class RPCMetricsOTel : public RPCMetricsFilter Collector() { } virtual ~Collector(); - void set_current_message(google::protobuf::Message *var_msg) - { - this->current_msg = var_msg; - } - void set_current_nano(unsigned long long ns) { this->current_timestamp_nano = ns; } - void collect_gauge(RPCVar *gauge, double data) override; - - void collect_counter_each(RPCVar *counter, const std::string& label, - double data) override; - - void collect_histogram_begin(RPCVar *histogram) override; - void collect_histogram_each(RPCVar *histogram, - double bucket_boudary, - size_t current_count) override; - void collect_histogram_end(RPCVar *histogram, double sum, - size_t count) override; - - void collect_summary_begin(RPCVar *summary) override; + // new api : fill var into msg + void collect_gauge(RPCVar *gauge, google::protobuf::Message *msg); + void collect_counter(RPCVar *counter, google::protobuf::Message *msg); + void collect_histogram(RPCVar *histogram, google::protobuf::Message *msg); + void collect_summary(RPCVar *summary, google::protobuf::Message *msg); + + void collect_counter_each(const std::string &label, double data, + google::protobuf::Message *msg); + void collect_histogram_each(double bucket_boudary, size_t current_count, + google::protobuf::Message *msg); + void collect_summary_each(double quantile, double quantile_out, + google::protobuf::Message *msg); + + // deprecated api + void collect_gauge(RPCVar *gauge, double data) override {} + void collect_counter_each(RPCVar *counter, const std::string &label, + double data) override {} + + void collect_histogram_begin(RPCVar *histogram) override {} + void collect_histogram_each(RPCVar *histogram, double bucket_boudary, + size_t current_count) override {} + void collect_histogram_end(RPCVar *histogram, double sum, size_t count) override{} + + void collect_summary_begin(RPCVar *summary) override {} void collect_summary_each(RPCVar *summary, double quantile, - double quantile_out) override; - void collect_summary_end(RPCVar *summary, double sum, - size_t count) override; + double quantile_out) override {} + void collect_summary_end(RPCVar *summary, double sum, size_t count) override {} private: void add_counter_label(const std::string& label); private: using LABEL_MAP = std::map; - google::protobuf::Message *current_msg; unsigned long long current_timestamp_nano; std::map label_map; }; @@ -247,11 +256,13 @@ class RPCMetricsOTel : public RPCMetricsFilter private: bool expose(google::protobuf::Message *metrics); +protected: + Collector collector; + private: std::string url; int redirect_max; int retry_max; - Collector collector; RPCFilterPolicy policy; std::atomic report_counts; std::map attributes; diff --git a/src/var/rpc_var.cc b/src/var/rpc_var.cc index ab18a8c4..2e9c1b73 100644 --- a/src/var/rpc_var.cc +++ b/src/var/rpc_var.cc @@ -458,7 +458,10 @@ RPCVar *SummaryVar::create(bool with_data) void SummaryVar::observe(double value) { + RPCVarLocal *local = RPCVarLocal::get_instance(); + local->mutex.lock(); this->quantile_values.insert(value); + local->mutex.unlock(); } bool SummaryVar::reduce(const void *ptr, size_t sz) @@ -572,6 +575,20 @@ void TimedGaugeVar::increase() bucket.increase(); } +void TimedGaugeVar::set(double val) +{ + this->rotate(); + + for (auto &bucket : this->data_bucket) + bucket.set(val); +} + +double TimedGaugeVar::get() +{ + GaugeVar &bucket = this->rotate(); + return bucket.get(); +} + const void *TimedGaugeVar::get_data() { GaugeVar& bucket = this->rotate(); diff --git a/src/var/rpc_var.h b/src/var/rpc_var.h index e1352c21..02c336ff 100644 --- a/src/var/rpc_var.h +++ b/src/var/rpc_var.h @@ -291,6 +291,10 @@ class HistogramVar : public RPCVar double get_sum() const { return this->sum; } size_t get_count() const { return this->count; } + const std::vector *get_bucket_boundaries() const + { + return &this->bucket_boundaries; + } const std::vector *get_bucket_counts() const { return &this->bucket_counts; @@ -330,6 +334,18 @@ class SummaryVar : public RPCVar void reset() override { /* no TimedSummary so no reset for Summary */} + const std::vector& get_quantiles() const + { + return this->quantiles; + } + const std::vector& get_quantile_out() const + { + return this->quantile_out; + } + + // only for clear stack variable after filled into protobuf or out_string + void clear_quantile_out() { this->quantile_out.clear(); } + public: SummaryVar(const std::string& name, const std::string& help, const std::vector& quantile, @@ -402,6 +418,9 @@ class TimedGaugeVar : public GaugeVar, RPCTimeWindow Clock::duration duration, size_t bucket_num); // for collect void increase() override; + double get() override; + void set(double var) override; + // for reduce const void *get_data() override; RPCVar *create(bool with_data) override; diff --git a/tools/templates/common/config.json b/tools/templates/common/config.json index 9b09a000..be9d6ec1 100644 --- a/tools/templates/common/config.json +++ b/tools/templates/common/config.json @@ -47,6 +47,7 @@ }, { "filter": "opentelemetry", + "filter_name": "otel_reporter1", "address": "http://opentelemetry.com:4389", "redirect_max": 0, "retry_max": 1, diff --git a/tools/templates/config/config_full.cc b/tools/templates/config/config_full.cc index ea2ebd8f..5345f3a7 100644 --- a/tools/templates/config/config_full.cc +++ b/tools/templates/config/config_full.cc @@ -321,6 +321,7 @@ void RPCConfig::load_metrics() if (it.has("address") == false) continue; + std::string name = it["filter_name"]; std::string url = it["address"]; unsigned int redirect_max = OTLP_HTTP_REDIRECT_MAX; @@ -337,7 +338,8 @@ void RPCConfig::load_metrics() if (it.has("report_interval_ms")) report_interval = it["report_interval_ms"]; - RPCMetricsOTel *filter = new RPCMetricsOTel(url, + RPCMetricsOTel *filter = new RPCMetricsOTel(name, + url, redirect_max, retry_max, report_threshold,