Skip to content

Commit

Permalink
Merge pull request #356 from holmes1412/master
Browse files Browse the repository at this point in the history
rpc_var : fix concurrent bug in RPCVarLocal and CounterVar
  • Loading branch information
Barenboim authored Jan 11, 2024
2 parents dfecdae + fbcd451 commit 66e2f2c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 4 deletions.
8 changes: 4 additions & 4 deletions src/module/rpc_metrics_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ RPCMetricsFilter::RPCMetricsFilter() :
bool RPCMetricsFilter::client_end(SubTask *task, RPCModuleData& data)
{
this->gauge(METRICS_REQUEST_COUNT)->increase();
this->counter(METRICS_REQUEST_METHOD)->add(
this->counter(METRICS_REQUEST_METHOD)->increase(
{{"service", data[OTLP_SERVICE_NAME]},
{"method", data[OTLP_METHOD_NAME] }})->increase();
{"method", data[OTLP_METHOD_NAME] }});
this->summary(METRICS_REQUEST_LATENCY)->observe(atoll(data[SRPC_DURATION].data()));

return true;
Expand All @@ -75,9 +75,9 @@ bool RPCMetricsFilter::client_end(SubTask *task, RPCModuleData& data)
bool RPCMetricsFilter::server_end(SubTask *task, RPCModuleData& data)
{
this->gauge(METRICS_REQUEST_COUNT)->increase();
this->counter(METRICS_REQUEST_METHOD)->add(
this->counter(METRICS_REQUEST_METHOD)->increase(
{{"service", data[OTLP_SERVICE_NAME]},
{"method", data[OTLP_METHOD_NAME] }})->increase();
{"method", data[OTLP_METHOD_NAME] }});
this->summary(METRICS_REQUEST_LATENCY)->observe(atoll(data[SRPC_DURATION].data()));

return true;
Expand Down
32 changes: 32 additions & 0 deletions src/var/rpc_var.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ RPCVar *RPCVarFactory::var(const std::string& name)
if (var)
{
new_var = var->create(false);
local->mutex.lock(); //agains reduce() for loop
local->add(name, new_var);
local->mutex.unlock();
return new_var;
}

Expand Down Expand Up @@ -232,6 +234,9 @@ bool CounterVar::label_to_str(const LABEL_MAP& labels, std::string& str)
return true;
}

// [deprecate]
// This cannot guarantee the GaugeVar still exists
// because global will counter->reset() and delete the internal GaugeVar
GaugeVar *CounterVar::add(const LABEL_MAP& labels)
{
std::string label_str;
Expand All @@ -253,6 +258,33 @@ GaugeVar *CounterVar::add(const LABEL_MAP& labels)
return var;
}

void CounterVar::increase(const LABEL_MAP& labels)
{
std::string label_str;
GaugeVar *var;

if (!this->label_to_str(labels, label_str))
return;

RPCVarLocal *local = RPCVarLocal::get_instance();
local->mutex.lock(); // against reset()

auto it = this->data.find(label_str);

if (it == this->data.end())
{
var = new GaugeVar(label_str, "");
this->data.insert(std::make_pair(label_str, var));
}
else
var = it->second;

var->increase();
local->mutex.unlock();

return;
}

bool CounterVar::reduce(const void *ptr, size_t)
{
std::unordered_map<std::string, GaugeVar *> *data;
Expand Down
1 change: 1 addition & 0 deletions src/var/rpc_var.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class CounterVar : public RPCVar
public:
using LABEL_MAP = std::map<std::string, std::string>;
GaugeVar *add(const LABEL_MAP& labels);
void increase(const LABEL_MAP& labels);

RPCVar *create(bool with_data) override;
bool reduce(const void *ptr, size_t sz) override;
Expand Down

0 comments on commit 66e2f2c

Please sign in to comment.