Skip to content

Commit e8eac7c

Browse files
author
Muxi Yan
committed
Merge remote-tracking branch 'upstream/master' into fix-authorizer
2 parents e5453b1 + 3278bdc commit e8eac7c

37 files changed

+651
-388
lines changed

examples/BUILD

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ cc_binary(
101101

102102
cc_binary(
103103
name = "keyvaluestore_client",
104-
srcs = ["cpp/keyvaluestore/client.cc"],
104+
srcs = ["cpp/keyvaluestore/caching_interceptor.h",
105+
"cpp/keyvaluestore/client.cc"],
105106
defines = ["BAZEL_BUILD"],
106107
deps = [":keyvaluestore", "//:grpc++"],
107108
)
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
*
3+
* Copyright 2018 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
#include <map>
20+
21+
#include <grpcpp/support/client_interceptor.h>
22+
23+
#ifdef BAZEL_BUILD
24+
#include "examples/protos/keyvaluestore.grpc.pb.h"
25+
#else
26+
#include "keyvaluestore.grpc.pb.h"
27+
#endif
28+
29+
// This is a naive implementation of a cache. A new cache is for each call. For
30+
// each new key request, the key is first searched in the map and if found, the
31+
// interceptor fills in the return value without making a request to the server.
32+
// Only if the key is not found in the cache do we make a request.
33+
class CachingInterceptor : public grpc::experimental::Interceptor {
34+
public:
35+
CachingInterceptor(grpc::experimental::ClientRpcInfo* info) {}
36+
37+
void Intercept(
38+
::grpc::experimental::InterceptorBatchMethods* methods) override {
39+
bool hijack = false;
40+
if (methods->QueryInterceptionHookPoint(
41+
grpc::experimental::InterceptionHookPoints::
42+
PRE_SEND_INITIAL_METADATA)) {
43+
// Hijack all calls
44+
hijack = true;
45+
// Create a stream on which this interceptor can make requests
46+
stub_ = keyvaluestore::KeyValueStore::NewStub(
47+
methods->GetInterceptedChannel());
48+
stream_ = stub_->GetValues(&context_);
49+
}
50+
if (methods->QueryInterceptionHookPoint(
51+
grpc::experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
52+
// We know that clients perform a Read and a Write in a loop, so we don't
53+
// need to maintain a list of the responses.
54+
std::string requested_key;
55+
const keyvaluestore::Request* req_msg =
56+
static_cast<const keyvaluestore::Request*>(methods->GetSendMessage());
57+
if (req_msg != nullptr) {
58+
requested_key = req_msg->key();
59+
} else {
60+
// The non-serialized form would not be available in certain scenarios,
61+
// so add a fallback
62+
keyvaluestore::Request req_msg;
63+
auto* buffer = methods->GetSerializedSendMessage();
64+
auto copied_buffer = *buffer;
65+
GPR_ASSERT(
66+
grpc::SerializationTraits<keyvaluestore::Request>::Deserialize(
67+
&copied_buffer, &req_msg)
68+
.ok());
69+
requested_key = req_msg.key();
70+
}
71+
72+
// Check if the key is present in the map
73+
auto search = cached_map_.find(requested_key);
74+
if (search != cached_map_.end()) {
75+
std::cout << "Key " << requested_key << "found in map";
76+
response_ = search->second;
77+
} else {
78+
std::cout << "Key " << requested_key << "not found in cache";
79+
// Key was not found in the cache, so make a request
80+
keyvaluestore::Request req;
81+
req.set_key(requested_key);
82+
stream_->Write(req);
83+
keyvaluestore::Response resp;
84+
stream_->Read(&resp);
85+
response_ = resp.value();
86+
// Insert the pair in the cache for future requests
87+
cached_map_.insert({requested_key, response_});
88+
}
89+
}
90+
if (methods->QueryInterceptionHookPoint(
91+
grpc::experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) {
92+
stream_->WritesDone();
93+
}
94+
if (methods->QueryInterceptionHookPoint(
95+
grpc::experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)) {
96+
keyvaluestore::Response* resp =
97+
static_cast<keyvaluestore::Response*>(methods->GetRecvMessage());
98+
resp->set_value(response_);
99+
}
100+
if (methods->QueryInterceptionHookPoint(
101+
grpc::experimental::InterceptionHookPoints::PRE_RECV_STATUS)) {
102+
auto* status = methods->GetRecvStatus();
103+
*status = grpc::Status::OK;
104+
}
105+
// One of Hijack or Proceed always needs to be called to make progress.
106+
if (hijack) {
107+
// Hijack is called only once when PRE_SEND_INITIAL_METADATA is present in
108+
// the hook points
109+
methods->Hijack();
110+
} else {
111+
// Proceed is an indicator that the interceptor is done intercepting the
112+
// batch.
113+
methods->Proceed();
114+
}
115+
}
116+
117+
private:
118+
grpc::ClientContext context_;
119+
std::unique_ptr<keyvaluestore::KeyValueStore::Stub> stub_;
120+
std::unique_ptr<
121+
grpc::ClientReaderWriter<keyvaluestore::Request, keyvaluestore::Response>>
122+
stream_;
123+
std::map<std::string, std::string> cached_map_;
124+
std::string response_;
125+
};
126+
127+
class CachingInterceptorFactory
128+
: public grpc::experimental::ClientInterceptorFactoryInterface {
129+
public:
130+
grpc::experimental::Interceptor* CreateClientInterceptor(
131+
grpc::experimental::ClientRpcInfo* info) override {
132+
return new CachingInterceptor(info);
133+
}
134+
};

examples/cpp/keyvaluestore/client.cc

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
#include <grpcpp/grpcpp.h>
2525

26+
#include "caching_interceptor.h"
27+
2628
#ifdef BAZEL_BUILD
2729
#include "examples/protos/keyvaluestore.grpc.pb.h"
2830
#else
@@ -77,9 +79,20 @@ int main(int argc, char** argv) {
7779
// are created. This channel models a connection to an endpoint (in this case,
7880
// localhost at port 50051). We indicate that the channel isn't authenticated
7981
// (use of InsecureChannelCredentials()).
80-
KeyValueStoreClient client(grpc::CreateChannel(
81-
"localhost:50051", grpc::InsecureChannelCredentials()));
82-
std::vector<std::string> keys = {"key1", "key2", "key3", "key4", "key5"};
82+
// In this example, we are using a cache which has been added in as an
83+
// interceptor.
84+
grpc::ChannelArguments args;
85+
std::vector<
86+
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
87+
interceptor_creators;
88+
interceptor_creators.push_back(std::unique_ptr<CachingInterceptorFactory>(
89+
new CachingInterceptorFactory()));
90+
auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
91+
"localhost:50051", grpc::InsecureChannelCredentials(), args,
92+
std::move(interceptor_creators));
93+
KeyValueStoreClient client(channel);
94+
std::vector<std::string> keys = {"key1", "key2", "key3", "key4",
95+
"key5", "key1", "key2", "key4"};
8396
client.GetValues(keys);
8497

8598
return 0;

src/core/ext/transport/chttp2/transport/chttp2_transport.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -968,19 +968,19 @@ static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t,
968968
get better latency overall if we switch writing work elsewhere and continue
969969
with application work above */
970970
if (!t->is_first_write_in_batch) {
971-
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
971+
return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
972972
}
973973
/* equivalently, if it's a partial write, we *know* we're going to be taking a
974974
thread jump to write it because of the above, may as well do so
975975
immediately */
976976
if (partial_write) {
977-
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
977+
return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
978978
}
979979
switch (t->opt_target) {
980980
case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
981981
/* executor gives us the largest probability of being able to batch a
982982
* write with others on this transport */
983-
return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
983+
return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT);
984984
case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
985985
return grpc_schedule_on_exec_ctx;
986986
}

src/core/lib/iomgr/combiner.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ grpc_combiner* grpc_combiner_create(void) {
8383
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
8484
gpr_mpscq_init(&lock->queue);
8585
grpc_closure_list_init(&lock->final_list);
86-
GRPC_CLOSURE_INIT(&lock->offload, offload, lock,
87-
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
86+
GRPC_CLOSURE_INIT(
87+
&lock->offload, offload, lock,
88+
grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT));
8889
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock));
8990
return lock;
9091
}
@@ -235,7 +236,7 @@ bool grpc_combiner_continue_exec_ctx() {
235236
// 3. the DEFAULT executor is threaded
236237
// 4. the current thread is not a worker for any background poller
237238
if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() &&
238-
grpc_executor_is_threaded() &&
239+
grpc_core::Executor::IsThreadedDefault() &&
239240
!grpc_iomgr_is_any_background_poller_thread()) {
240241
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
241242
// this execution context wants to move on: schedule remaining work to be

0 commit comments

Comments
 (0)