Skip to content

Commit e042e9f

Browse files
authored
[enhancement](brpc) remove client from brpc cache if the underlying channel has error (#47487)
### What problem does this PR solve? If the channel in the stub has error, it should not be reused any more, it should be removed from the cache.
1 parent 3509647 commit e042e9f

File tree

2 files changed

+132
-2
lines changed

2 files changed

+132
-2
lines changed

be/src/util/brpc_client_cache.h

+85-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040

4141
#include "common/compiler_util.h" // IWYU pragma: keep
4242
#include "common/config.h"
43+
#include "common/status.h"
4344
#include "runtime/exec_env.h"
45+
#include "service/backend_options.h"
4446
#include "util/dns_cache.h"
4547
#include "util/network_util.h"
4648

@@ -56,6 +58,80 @@ using StubMap = phmap::parallel_flat_hash_map<
5658

5759
namespace doris {
5860

61+
class FailureDetectClosure : public ::google::protobuf::Closure {
62+
public:
63+
FailureDetectClosure(std::shared_ptr<AtomicStatus>& channel_st,
64+
::google::protobuf::RpcController* controller,
65+
::google::protobuf::Closure* done)
66+
: _channel_st(channel_st), _controller(controller), _done(done) {}
67+
68+
void Run() override {
69+
Defer defer {[&]() { delete this; }};
70+
// All brpc related API will use brpc::Controller, so that it is safe
71+
// to do static cast here.
72+
auto* cntl = static_cast<brpc::Controller*>(_controller);
73+
if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
74+
Status error_st = Status::NetworkError(
75+
"Failed to send brpc, error={}, error_text={}, client: {}, latency = {}",
76+
berror(cntl->ErrorCode()), cntl->ErrorText(), BackendOptions::get_localhost(),
77+
cntl->latency_us());
78+
LOG(WARNING) << error_st;
79+
_channel_st->update(error_st);
80+
}
81+
// Sometimes done == nullptr, for example hand_shake API.
82+
if (_done != nullptr) {
83+
_done->Run();
84+
}
85+
// _done->Run may throw exception, so that move delete this to Defer.
86+
// delete this;
87+
}
88+
89+
private:
90+
std::shared_ptr<AtomicStatus> _channel_st;
91+
::google::protobuf::RpcController* _controller;
92+
::google::protobuf::Closure* _done;
93+
};
94+
95+
// This channel will use FailureDetectClosure to wrap the original closure
96+
// If some non-recoverable rpc failure happens, it will save the error status in
97+
// _channel_st.
98+
// And brpc client cache will depend on it to detect if the client is health.
99+
class FailureDetectChannel : public ::brpc::Channel {
100+
public:
101+
FailureDetectChannel() : ::brpc::Channel() {
102+
_channel_st = std::make_shared<AtomicStatus>(); // default OK
103+
}
104+
void CallMethod(const google::protobuf::MethodDescriptor* method,
105+
google::protobuf::RpcController* controller,
106+
const google::protobuf::Message* request, google::protobuf::Message* response,
107+
google::protobuf::Closure* done) override {
108+
FailureDetectClosure* failure_detect_closure = nullptr;
109+
if (done != nullptr) {
110+
// If done == nullptr, then it means the call is sync call, so that should not
111+
// gen a failure detect closure for it. Or it will core.
112+
failure_detect_closure = new FailureDetectClosure(_channel_st, controller, done);
113+
}
114+
::brpc::Channel::CallMethod(method, controller, request, response, failure_detect_closure);
115+
// Done == nullptr, it is a sync call, should also deal with the bad channel.
116+
if (done == nullptr) {
117+
auto* cntl = static_cast<brpc::Controller*>(controller);
118+
if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
119+
Status error_st = Status::NetworkError(
120+
"Failed to send brpc, error={}, error_text={}, client: {}, latency = {}",
121+
berror(cntl->ErrorCode()), cntl->ErrorText(),
122+
BackendOptions::get_localhost(), cntl->latency_us());
123+
LOG(WARNING) << error_st;
124+
_channel_st->update(error_st);
125+
}
126+
}
127+
}
128+
129+
std::shared_ptr<AtomicStatus> channel_status() { return _channel_st; }
130+
131+
private:
132+
std::shared_ptr<AtomicStatus> _channel_st;
133+
};
134+
59135
template <class T>
60136
class BrpcClientCache {
61137
public:
@@ -99,7 +175,14 @@ class BrpcClientCache {
99175
auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; };
100176
if (LIKELY(_stub_map.if_contains(host_port, get_value))) {
101177
DCHECK(stub_ptr != nullptr);
102-
return stub_ptr;
178+
// All client created from this cache will use FailureDetectChannel, so it is
179+
// safe to do static cast here.
180+
// Check if the base channel is OK, if not ignore the stub and create new one.
181+
if (static_cast<FailureDetectChannel*>(stub_ptr->channel())->channel_status()->ok()) {
182+
return stub_ptr;
183+
} else {
184+
_stub_map.erase(host_port);
185+
}
103186
}
104187

105188
// new one stub and insert into map
@@ -148,7 +231,7 @@ class BrpcClientCache {
148231
options.timeout_ms = 2000;
149232
options.max_retry = 10;
150233

151-
std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
234+
std::unique_ptr<FailureDetectChannel> channel(new FailureDetectChannel());
152235
int ret_code = 0;
153236
if (host_port.find("://") == std::string::npos) {
154237
ret_code = channel->Init(host_port.c_str(), &options);

be/test/util/brpc_client_cache_test.cpp

+47
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,51 @@ TEST_F(BrpcClientCacheTest, invalid) {
5656
EXPECT_EQ(nullptr, stub1);
5757
}
5858

59+
TEST_F(BrpcClientCacheTest, failure) {
60+
BrpcClientCache<PBackendService_Stub> cache;
61+
TNetworkAddress address;
62+
address.hostname = "127.0.0.1";
63+
address.port = 123;
64+
std::shared_ptr<PBackendService_Stub> stub1 = cache.get_client(address);
65+
EXPECT_NE(nullptr, stub1);
66+
std::shared_ptr<PBackendService_Stub> stub2 = cache.get_client(address);
67+
EXPECT_NE(nullptr, stub2);
68+
// The channel is ok, so that the stub is the same
69+
EXPECT_EQ(stub1, stub2);
70+
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub1->channel())->channel_status()->ok());
71+
72+
// update channel st to error, will get a new stub
73+
static_cast<FailureDetectChannel*>(stub1->channel())
74+
->channel_status()
75+
->update(Status::NetworkError("test brpc error"));
76+
std::shared_ptr<PBackendService_Stub> stub3 = cache.get_client(address);
77+
EXPECT_NE(nullptr, stub3);
78+
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub3->channel())->channel_status()->ok());
79+
// Then will get a new brpc stub not the previous one.
80+
EXPECT_NE(stub2, stub3);
81+
// The previous channel is not ok.
82+
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub2->channel())->channel_status()->ok());
83+
84+
// Call handshake method, it will trigger host is down error. It is a sync call, not use closure.
85+
cache.available(stub3, address.hostname, address.port);
86+
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub3->channel())->channel_status()->ok());
87+
88+
std::shared_ptr<PBackendService_Stub> stub4 = cache.get_client(address);
89+
EXPECT_NE(nullptr, stub4);
90+
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub4->channel())->channel_status()->ok());
91+
92+
// Call handshake method, it will trigger host is down error. It is a async all, will use closure.
93+
std::string message = "hello doris!";
94+
PHandShakeRequest request;
95+
request.set_hello(message);
96+
PHandShakeResponse response;
97+
brpc::Controller cntl4;
98+
stub4->hand_shake(&cntl4, &request, &response, brpc::DoNothing());
99+
brpc::Join(cntl4.call_id());
100+
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub4->channel())->channel_status()->ok());
101+
102+
// Check map size is 1
103+
EXPECT_EQ(1, cache.size());
104+
}
105+
59106
} // namespace doris

0 commit comments

Comments
 (0)