Skip to content

Commit 01ab821

Browse files
authored
interop-test: add orca test case (grpc#9079)
add interop testing `orca_per_rpc` and `orca_oob`
1 parent 62119b2 commit 01ab821

File tree

10 files changed

+293
-6
lines changed

10 files changed

+293
-6
lines changed

android-interop-testing/build.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ dependencies {
7474
exclude group: 'org.apache.httpcomponents'
7575
}
7676

77+
implementation (project(':grpc-services')) {
78+
exclude group: 'com.google.protobuf'
79+
}
80+
7781
compileOnly libraries.javax_annotation
7882

7983
androidTestImplementation 'androidx.test.ext:junit:1.1.3',

interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java

+49
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
9393
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
9494
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
95+
import io.grpc.testing.integration.Messages.TestOrcaReport;
9596
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
9697
import io.opencensus.stats.Measure;
9798
import io.opencensus.stats.Measure.MeasureDouble;
@@ -188,6 +189,11 @@ public abstract class AbstractInteropTest {
188189
private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers =
189190
new LinkedBlockingQueue<>();
190191

192+
static final CallOptions.Key<AtomicReference<TestOrcaReport>>
193+
ORCA_RPC_REPORT_KEY = CallOptions.Key.create("orca-rpc-report");
194+
static final CallOptions.Key<AtomicReference<TestOrcaReport>>
195+
ORCA_OOB_REPORT_KEY = CallOptions.Key.create("orca-oob-report");
196+
191197
private static final class ServerStreamTracerInfo {
192198
final String fullMethodName;
193199
final InteropServerStreamTracer tracer;
@@ -1731,6 +1737,49 @@ public void getServerAddressAndLocalAddressFromClient() {
17311737
assertNotNull(obtainLocalClientAddr());
17321738
}
17331739

1740+
/**
1741+
* Test backend metrics per query reporting: expect the test client LB policy to receive load
1742+
* reports.
1743+
*/
1744+
public void testOrcaPerRpc() throws Exception {
1745+
AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
1746+
TestOrcaReport answer = TestOrcaReport.newBuilder()
1747+
.setCpuUtilization(0.8210)
1748+
.setMemoryUtilization(0.5847)
1749+
.putRequestCost("cost", 3456.32)
1750+
.putUtilization("util", 0.30499)
1751+
.build();
1752+
blockingStub.withOption(ORCA_RPC_REPORT_KEY, reportHolder).unaryCall(
1753+
SimpleRequest.newBuilder().setOrcaPerRpcReport(answer).build());
1754+
assertThat(reportHolder.get()).isEqualTo(answer);
1755+
}
1756+
1757+
/**
1758+
* Test backend metrics OOB reporting: expect the test client LB policy to receive load reports.
1759+
*/
1760+
public void testOrcaOob() throws Exception {
1761+
AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
1762+
TestOrcaReport answer = TestOrcaReport.newBuilder()
1763+
.setCpuUtilization(0.8210)
1764+
.setMemoryUtilization(0.5847)
1765+
.putUtilization("util", 0.30499)
1766+
.build();
1767+
blockingStub.unaryCall(SimpleRequest.newBuilder().setOrcaOobReport(answer).build());
1768+
Thread.sleep(1500);
1769+
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
1770+
assertThat(reportHolder.get()).isEqualTo(answer);
1771+
1772+
answer = TestOrcaReport.newBuilder()
1773+
.setCpuUtilization(0.29309)
1774+
.setMemoryUtilization(0.2)
1775+
.putUtilization("util", 100.2039)
1776+
.build();
1777+
blockingStub.unaryCall(SimpleRequest.newBuilder().setOrcaOobReport(answer).build());
1778+
Thread.sleep(1500);
1779+
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
1780+
assertThat(reportHolder.get()).isEqualTo(answer);
1781+
}
1782+
17341783
/** Sends a large unary rpc with service account credentials. */
17351784
public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope)
17361785
throws Exception {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright 2022 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.testing.integration;
18+
19+
import static io.grpc.testing.integration.AbstractInteropTest.ORCA_OOB_REPORT_KEY;
20+
import static io.grpc.testing.integration.AbstractInteropTest.ORCA_RPC_REPORT_KEY;
21+
22+
import io.grpc.ConnectivityState;
23+
import io.grpc.LoadBalancer;
24+
import io.grpc.LoadBalancerProvider;
25+
import io.grpc.LoadBalancerRegistry;
26+
import io.grpc.testing.integration.Messages.TestOrcaReport;
27+
import io.grpc.util.ForwardingLoadBalancer;
28+
import io.grpc.util.ForwardingLoadBalancerHelper;
29+
import io.grpc.xds.orca.OrcaOobUtil;
30+
import io.grpc.xds.orca.OrcaPerRequestUtil;
31+
import io.grpc.xds.shaded.com.github.xds.data.orca.v3.OrcaLoadReport;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicReference;
34+
35+
/**
36+
* Implements a test LB policy that receives ORCA load reports.
37+
*/
38+
final class CustomBackendMetricsLoadBalancerProvider extends LoadBalancerProvider {
39+
40+
static final String TEST_ORCA_LB_POLICY_NAME = "test_backend_metrics_load_balancer";
41+
private volatile TestOrcaReport latestOobReport;
42+
43+
@Override
44+
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
45+
return new CustomBackendMetricsLoadBalancer(helper);
46+
}
47+
48+
@Override
49+
public boolean isAvailable() {
50+
return true;
51+
}
52+
53+
@Override
54+
public int getPriority() {
55+
return 0;
56+
}
57+
58+
@Override
59+
public String getPolicyName() {
60+
return TEST_ORCA_LB_POLICY_NAME;
61+
}
62+
63+
private final class CustomBackendMetricsLoadBalancer extends ForwardingLoadBalancer {
64+
private LoadBalancer delegate;
65+
66+
public CustomBackendMetricsLoadBalancer(Helper helper) {
67+
this.delegate = LoadBalancerRegistry.getDefaultRegistry()
68+
.getProvider("pick_first")
69+
.newLoadBalancer(new CustomBackendMetricsLoadBalancerHelper(helper));
70+
}
71+
72+
@Override
73+
public LoadBalancer delegate() {
74+
return delegate;
75+
}
76+
77+
private final class CustomBackendMetricsLoadBalancerHelper
78+
extends ForwardingLoadBalancerHelper {
79+
private final Helper orcaHelper;
80+
81+
public CustomBackendMetricsLoadBalancerHelper(Helper helper) {
82+
this.orcaHelper = OrcaOobUtil.newOrcaReportingHelper(helper);
83+
}
84+
85+
@Override
86+
public Subchannel createSubchannel(CreateSubchannelArgs args) {
87+
Subchannel subchannel = super.createSubchannel(args);
88+
OrcaOobUtil.setListener(subchannel, new OrcaOobUtil.OrcaOobReportListener() {
89+
@Override
90+
public void onLoadReport(OrcaLoadReport orcaLoadReport) {
91+
latestOobReport = fromOrcaLoadReport(orcaLoadReport);
92+
}
93+
},
94+
OrcaOobUtil.OrcaReportingConfig.newBuilder()
95+
.setReportInterval(1, TimeUnit.SECONDS)
96+
.build()
97+
);
98+
return subchannel;
99+
}
100+
101+
@Override
102+
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
103+
delegate().updateBalancingState(newState, new MayReportLoadPicker(newPicker));
104+
}
105+
106+
@Override
107+
public Helper delegate() {
108+
return orcaHelper;
109+
}
110+
}
111+
112+
private final class MayReportLoadPicker extends SubchannelPicker {
113+
private SubchannelPicker delegate;
114+
115+
public MayReportLoadPicker(SubchannelPicker delegate) {
116+
this.delegate = delegate;
117+
}
118+
119+
@Override
120+
public PickResult pickSubchannel(PickSubchannelArgs args) {
121+
PickResult result = delegate.pickSubchannel(args);
122+
if (result.getSubchannel() == null) {
123+
return result;
124+
}
125+
AtomicReference<TestOrcaReport> reportRef =
126+
args.getCallOptions().getOption(ORCA_OOB_REPORT_KEY);
127+
if (reportRef != null) {
128+
reportRef.set(latestOobReport);
129+
}
130+
131+
return PickResult.withSubchannel(
132+
result.getSubchannel(),
133+
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
134+
new OrcaPerRequestUtil.OrcaPerRequestReportListener() {
135+
@Override
136+
public void onLoadReport(OrcaLoadReport orcaLoadReport) {
137+
AtomicReference<TestOrcaReport> reportRef =
138+
args.getCallOptions().getOption(ORCA_RPC_REPORT_KEY);
139+
reportRef.set(fromOrcaLoadReport(orcaLoadReport));
140+
}
141+
}));
142+
}
143+
}
144+
}
145+
146+
private static TestOrcaReport fromOrcaLoadReport(OrcaLoadReport orcaLoadReport) {
147+
return TestOrcaReport.newBuilder()
148+
.setCpuUtilization(orcaLoadReport.getCpuUtilization())
149+
.setMemoryUtilization(orcaLoadReport.getMemUtilization())
150+
.putAllRequestCost(orcaLoadReport.getRequestCostMap())
151+
.putAllUtilization(orcaLoadReport.getUtilizationMap())
152+
.build();
153+
}
154+
}

interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ public enum TestCases {
5656
VERY_LARGE_REQUEST("very large request"),
5757
PICK_FIRST_UNARY("all requests are sent to one server despite multiple servers are resolved"),
5858
RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"),
59-
CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel");
59+
CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"),
60+
ORCA_PER_RPC("report backend metrics per query"),
61+
ORCA_OOB("report backend metrics out-of-band");
6062

6163
private final String description;
6264

interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java

+20
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.grpc.Grpc;
2323
import io.grpc.InsecureChannelCredentials;
2424
import io.grpc.InsecureServerCredentials;
25+
import io.grpc.LoadBalancerProvider;
26+
import io.grpc.LoadBalancerRegistry;
2527
import io.grpc.ManagedChannel;
2628
import io.grpc.ManagedChannelBuilder;
2729
import io.grpc.ServerBuilder;
@@ -60,6 +62,8 @@ public static void main(String[] args) throws Exception {
6062
TestUtils.installConscryptIfAvailable();
6163
final TestServiceClient client = new TestServiceClient();
6264
client.parseArgs(args);
65+
customBackendMetricsLoadBalancerProvider = new CustomBackendMetricsLoadBalancerProvider();
66+
LoadBalancerRegistry.getDefaultRegistry().register(customBackendMetricsLoadBalancerProvider);
6367
client.setUp();
6468

6569
try {
@@ -91,6 +95,7 @@ public static void main(String[] args) throws Exception {
9195
private int soakPerIterationMaxAcceptableLatencyMs = 1000;
9296
private int soakOverallTimeoutSeconds =
9397
soakIterations * soakPerIterationMaxAcceptableLatencyMs / 1000;
98+
private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider;
9499

95100
private Tester tester = new Tester();
96101

@@ -239,6 +244,10 @@ void setUp() {
239244
private synchronized void tearDown() {
240245
try {
241246
tester.tearDown();
247+
if (customBackendMetricsLoadBalancerProvider != null) {
248+
LoadBalancerRegistry.getDefaultRegistry()
249+
.deregister(customBackendMetricsLoadBalancerProvider);
250+
}
242251
} catch (RuntimeException ex) {
243252
throw ex;
244253
} catch (Exception ex) {
@@ -460,6 +469,17 @@ private void runTest(TestCases testCase) throws Exception {
460469
soakPerIterationMaxAcceptableLatencyMs,
461470
soakOverallTimeoutSeconds);
462471
break;
472+
473+
}
474+
475+
case ORCA_PER_RPC: {
476+
tester.testOrcaPerRpc();
477+
break;
478+
}
479+
480+
case ORCA_OOB: {
481+
tester.testOrcaOob();
482+
break;
463483
}
464484

465485
default:

interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import io.grpc.ServerInterceptor;
2727
import io.grpc.Status;
2828
import io.grpc.internal.LogExceptionRunnable;
29+
import io.grpc.services.CallMetricRecorder;
30+
import io.grpc.services.MetricRecorder;
2931
import io.grpc.stub.ServerCallStreamObserver;
3032
import io.grpc.stub.StreamObserver;
3133
import io.grpc.testing.integration.Messages.Payload;
@@ -36,10 +38,13 @@
3638
import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
3739
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
3840
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
41+
import io.grpc.testing.integration.Messages.TestOrcaReport;
3942
import java.util.ArrayDeque;
4043
import java.util.Arrays;
44+
import java.util.HashMap;
4145
import java.util.HashSet;
4246
import java.util.List;
47+
import java.util.Map;
4348
import java.util.Queue;
4449
import java.util.Random;
4550
import java.util.Set;
@@ -57,13 +62,19 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
5762

5863
private final ScheduledExecutorService executor;
5964
private final ByteString compressableBuffer;
65+
private final MetricRecorder metricRecorder;
6066

6167
/**
6268
* Constructs a controller using the given executor for scheduling response stream chunks.
6369
*/
64-
public TestServiceImpl(ScheduledExecutorService executor) {
70+
public TestServiceImpl(ScheduledExecutorService executor, MetricRecorder metricRecorder) {
6571
this.executor = executor;
6672
this.compressableBuffer = ByteString.copyFrom(new byte[1024]);
73+
this.metricRecorder = metricRecorder;
74+
}
75+
76+
public TestServiceImpl(ScheduledExecutorService executor) {
77+
this(executor, MetricRecorder.newInstance());
6778
}
6879

6980
@Override
@@ -112,10 +123,33 @@ public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> response
112123
return;
113124
}
114125

126+
echoCallMetricsFromPayload(req.getOrcaPerRpcReport());
127+
echoMetricsFromPayload(req.getOrcaOobReport());
115128
responseObserver.onNext(responseBuilder.build());
116129
responseObserver.onCompleted();
117130
}
118131

132+
private static void echoCallMetricsFromPayload(TestOrcaReport report) {
133+
CallMetricRecorder recorder = CallMetricRecorder.getCurrent()
134+
.recordCpuUtilizationMetric(report.getCpuUtilization())
135+
.recordMemoryUtilizationMetric(report.getMemoryUtilization());
136+
for (Map.Entry<String, Double> entry : report.getUtilizationMap().entrySet()) {
137+
recorder.recordUtilizationMetric(entry.getKey(), entry.getValue());
138+
}
139+
for (Map.Entry<String, Double> entry : report.getRequestCostMap().entrySet()) {
140+
recorder.recordCallMetric(entry.getKey(), entry.getValue());
141+
}
142+
}
143+
144+
private void echoMetricsFromPayload(TestOrcaReport report) {
145+
metricRecorder.setCpuUtilizationMetric(report.getCpuUtilization());
146+
metricRecorder.setMemoryUtilizationMetric(report.getMemoryUtilization());
147+
metricRecorder.setAllUtilizationMetrics(new HashMap<>());
148+
for (Map.Entry<String, Double> entry : report.getUtilizationMap().entrySet()) {
149+
metricRecorder.putUtilizationMetric(entry.getKey(), entry.getValue());
150+
}
151+
}
152+
119153
/**
120154
* Given a request that specifies chunk size and interval between responses, creates and schedules
121155
* the response stream.

0 commit comments

Comments
 (0)