Skip to content

Commit eded3b6

Browse files
authored
client channel: record call completion even if recv_trailing_metadata was not started (grpc#29198)
* client channel: record call completion even if recv_trailing_metadata was not started * add test * add test for opencensus filter * remove unnecessary code
1 parent 8f3cd54 commit eded3b6

File tree

10 files changed

+186
-76
lines changed

10 files changed

+186
-76
lines changed

src/core/ext/filters/client_channel/client_channel.cc

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2427,6 +2427,7 @@ class ClientChannel::LoadBalancedCall::Metadata
24272427
explicit Metadata(grpc_metadata_batch* batch) : batch_(batch) {}
24282428

24292429
void Add(absl::string_view key, absl::string_view value) override {
2430+
if (batch_ == nullptr) return;
24302431
// Gross, egregious hack to support legacy grpclb behavior.
24312432
// TODO(ctiller): Use a promise context for this once that plumbing is done.
24322433
if (key == GrpcLbClientStatsMetadata::key()) {
@@ -2447,13 +2448,15 @@ class ClientChannel::LoadBalancedCall::Metadata
24472448

24482449
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
24492450
override {
2451+
if (batch_ == nullptr) return {};
24502452
Encoder encoder;
24512453
batch_->Encode(&encoder);
24522454
return encoder.Take();
24532455
}
24542456

24552457
absl::optional<absl::string_view> Lookup(absl::string_view key,
24562458
std::string* buffer) const override {
2459+
if (batch_ == nullptr) return absl::nullopt;
24572460
return batch_->GetStringValue(key, buffer);
24582461
}
24592462

@@ -2524,7 +2527,8 @@ class ClientChannel::LoadBalancedCall::BackendMetricAccessor
25242527
: lb_call_(lb_call) {}
25252528

25262529
const BackendMetricData* GetBackendMetricData() override {
2527-
if (lb_call_->backend_metric_data_ == nullptr) {
2530+
if (lb_call_->backend_metric_data_ == nullptr &&
2531+
lb_call_->recv_trailing_metadata_ != nullptr) {
25282532
if (const auto* md = lb_call_->recv_trailing_metadata_->get_pointer(
25292533
XEndpointLoadMetricsBinMetadata())) {
25302534
lb_call_->backend_metric_data_ =
@@ -2594,6 +2598,12 @@ ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
25942598
}
25952599

25962600
void ClientChannel::LoadBalancedCall::Orphan() {
2601+
// If the recv_trailing_metadata op was never started, then notify
2602+
// about call completion here, as best we can. We assume status
2603+
// CANCELLED in this case.
2604+
if (recv_trailing_metadata_ == nullptr) {
2605+
RecordCallCompletion(absl::CancelledError("call cancelled"));
2606+
}
25972607
// Compute latency and report it to the tracer.
25982608
if (call_attempt_tracer_ != nullptr) {
25992609
gpr_timespec latency =
@@ -2905,22 +2915,7 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
29052915
status = absl::Status(static_cast<absl::StatusCode>(code), message);
29062916
}
29072917
}
2908-
// If we have a tracer, notify it.
2909-
if (self->call_attempt_tracer_ != nullptr) {
2910-
self->call_attempt_tracer_->RecordReceivedTrailingMetadata(
2911-
status, self->recv_trailing_metadata_,
2912-
*self->transport_stream_stats_);
2913-
}
2914-
// If the LB policy requested a callback for trailing metadata, invoke
2915-
// the callback.
2916-
if (self->lb_subchannel_call_tracker_ != nullptr) {
2917-
Metadata trailing_metadata(self->recv_trailing_metadata_);
2918-
BackendMetricAccessor backend_metric_accessor(self);
2919-
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
2920-
status, &trailing_metadata, &backend_metric_accessor};
2921-
self->lb_subchannel_call_tracker_->Finish(args);
2922-
self->lb_subchannel_call_tracker_.reset();
2923-
}
2918+
self->RecordCallCompletion(status);
29242919
}
29252920
// Chain to original callback.
29262921
if (self->failure_error_ != GRPC_ERROR_NONE) {
@@ -2933,6 +2928,25 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
29332928
error);
29342929
}
29352930

2931+
void ClientChannel::LoadBalancedCall::RecordCallCompletion(
2932+
absl::Status status) {
2933+
// If we have a tracer, notify it.
2934+
if (call_attempt_tracer_ != nullptr) {
2935+
call_attempt_tracer_->RecordReceivedTrailingMetadata(
2936+
status, recv_trailing_metadata_, transport_stream_stats_);
2937+
}
2938+
// If the LB policy requested a callback for trailing metadata, invoke
2939+
// the callback.
2940+
if (lb_subchannel_call_tracker_ != nullptr) {
2941+
Metadata trailing_metadata(recv_trailing_metadata_);
2942+
BackendMetricAccessor backend_metric_accessor(this);
2943+
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
2944+
status, &trailing_metadata, &backend_metric_accessor};
2945+
lb_subchannel_call_tracker_->Finish(args);
2946+
lb_subchannel_call_tracker_.reset();
2947+
}
2948+
}
2949+
29362950
void ClientChannel::LoadBalancedCall::CreateSubchannelCall() {
29372951
SubchannelCall::Args call_args = {
29382952
std::move(connected_subchannel_), pollent_, path_.Ref(), /*start_time=*/0,

src/core/ext/filters/client_channel/client_channel.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,8 @@ class ClientChannel::LoadBalancedCall
433433
static void RecvMessageReady(void* arg, grpc_error_handle error);
434434
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
435435

436+
void RecordCallCompletion(absl::Status status);
437+
436438
void CreateSubchannelCall();
437439
// Invoked when a pick is completed, on both success or failure.
438440
static void PickDone(void* arg, grpc_error_handle error);

src/core/lib/channel/call_tracer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,12 @@ class CallTracer {
5959
virtual void RecordReceivedInitialMetadata(
6060
grpc_metadata_batch* recv_initial_metadata, uint32_t flags) = 0;
6161
virtual void RecordReceivedMessage(const ByteStream& recv_message) = 0;
62+
// If the call was cancelled before the recv_trailing_metadata op
63+
// was started, recv_trailing_metadata and transport_stream_stats
64+
// will be null.
6265
virtual void RecordReceivedTrailingMetadata(
6366
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
64-
const grpc_transport_stream_stats& transport_stream_stats) = 0;
67+
const grpc_transport_stream_stats* transport_stream_stats) = 0;
6568
virtual void RecordCancel(grpc_error_handle cancel_error) = 0;
6669
// Should be the last API call to the object. Once invoked, the tracer
6770
// library is free to destroy the object.

src/cpp/ext/filters/census/client_filter.cc

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
9595

9696
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
9797
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata,
98-
uint32_t /* flags */) {
98+
uint32_t /*flags*/) {
9999
char tracing_buf[kMaxTraceContextLen];
100100
size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf,
101101
kMaxTraceContextLen);
@@ -114,12 +114,12 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
114114
}
115115

116116
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
117-
const grpc_core::ByteStream& /* send_message */) {
117+
const grpc_core::ByteStream& /*send_message*/) {
118118
++sent_message_count_;
119119
}
120120

121121
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
122-
const grpc_core::ByteStream& /* recv_message */) {
122+
const grpc_core::ByteStream& /*recv_message*/) {
123123
++recv_message_count_;
124124
}
125125

@@ -140,21 +140,25 @@ void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
140140
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
141141
RecordReceivedTrailingMetadata(
142142
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
143-
const grpc_transport_stream_stats& transport_stream_stats) {
144-
FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time_);
145-
const uint64_t request_size = transport_stream_stats.outgoing.data_bytes;
146-
const uint64_t response_size = transport_stream_stats.incoming.data_bytes;
143+
const grpc_transport_stream_stats* transport_stream_stats) {
144+
status_code_ = status.code();
145+
if (recv_trailing_metadata == nullptr || transport_stream_stats == nullptr) {
146+
return;
147+
}
148+
uint64_t elapsed_time = 0;
149+
FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time);
147150
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
148151
context_.tags().tags();
149152
tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
150-
status_code_ = status.code();
151153
std::string final_status = absl::StatusCodeToString(status_code_);
152154
tags.emplace_back(ClientStatusTagKey(), final_status);
153155
::opencensus::stats::Record(
154-
{{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
155-
{RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
156+
{{RpcClientSentBytesPerRpc(),
157+
static_cast<double>(transport_stream_stats->outgoing.data_bytes)},
158+
{RpcClientReceivedBytesPerRpc(),
159+
static_cast<double>(transport_stream_stats->incoming.data_bytes)},
156160
{RpcClientServerLatency(),
157-
ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}},
161+
ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time))}},
158162
tags);
159163
}
160164

@@ -165,7 +169,7 @@ void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
165169
}
166170

167171
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
168-
const gpr_timespec& /* latency */) {
172+
const gpr_timespec& /*latency*/) {
169173
double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
170174
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
171175
context_.tags().tags();

src/cpp/ext/filters/census/open_census_call_tracer.h

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,23 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
3333
OpenCensusCallAttemptTracer(OpenCensusCallTracer* parent,
3434
uint64_t attempt_num, bool is_transparent_retry,
3535
bool arena_allocated);
36-
void RecordSendInitialMetadata(
37-
grpc_metadata_batch* /* send_initial_metadata */,
38-
uint32_t /* flags */) override;
39-
void RecordOnDoneSendInitialMetadata(gpr_atm* /* peer_string */) override {}
36+
void RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata,
37+
uint32_t /*flags*/) override;
38+
void RecordOnDoneSendInitialMetadata(gpr_atm* /*peer_string*/) override {}
4039
void RecordSendTrailingMetadata(
41-
grpc_metadata_batch* /* send_trailing_metadata */) override {}
40+
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
4241
void RecordSendMessage(
43-
const grpc_core::ByteStream& /* send_message */) override;
42+
const grpc_core::ByteStream& /*send_message*/) override;
4443
void RecordReceivedInitialMetadata(
45-
grpc_metadata_batch* /* recv_initial_metadata */,
46-
uint32_t /* flags */) override {}
44+
grpc_metadata_batch* /*recv_initial_metadata*/,
45+
uint32_t /*flags*/) override {}
4746
void RecordReceivedMessage(
48-
const grpc_core::ByteStream& /* recv_message */) override;
47+
const grpc_core::ByteStream& /*recv_message*/) override;
4948
void RecordReceivedTrailingMetadata(
50-
absl::Status /* status */, grpc_metadata_batch* recv_trailing_metadata,
51-
const grpc_transport_stream_stats& /* transport_stream_stats */)
52-
override;
49+
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
50+
const grpc_transport_stream_stats* transport_stream_stats) override;
5351
void RecordCancel(grpc_error_handle cancel_error) override;
54-
void RecordEnd(const gpr_timespec& /* latency */) override;
52+
void RecordEnd(const gpr_timespec& /*latency*/) override;
5553

5654
CensusContext* context() { return &context_; }
5755

@@ -65,8 +63,6 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
6563
CensusContext context_;
6664
// Start time (for measuring latency).
6765
absl::Time start_time_;
68-
// Server elapsed time in nanoseconds.
69-
uint64_t elapsed_time_ = 0;
7066
// Number of messages in this RPC.
7167
uint64_t recv_message_count_ = 0;
7268
uint64_t sent_message_count_ = 0;

test/core/util/test_lb_policies.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
281281

282282
void Finish(FinishArgs args) override {
283283
TrailingMetadataArgsSeen args_seen;
284+
args_seen.status = args.status;
284285
args_seen.backend_metric_data =
285286
args.backend_metric_accessor->GetBackendMetricData();
286287
args_seen.metadata = args.trailing_metadata->TestOnlyCopyToVector();

test/core/util/test_lb_policies.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ void RegisterTestPickArgsLoadBalancingPolicy(
3636
TestPickArgsCallback cb, const char* delegate_policy_name = "pick_first");
3737

3838
struct TrailingMetadataArgsSeen {
39+
absl::Status status;
3940
const LoadBalancingPolicy::BackendMetricAccessor::BackendMetricData*
4041
backend_metric_data;
4142
MetadataVector metadata;

0 commit comments

Comments
 (0)