Skip to content

Commit c64ce96

Browse files
committed
chore: wire up x-goog-spanner-request-id generation to RPCs
Wire up the request-id generation with the respective RPCs as a first phase before applying grpc.CallSettings. Updates #3537
1 parent 7a8a29b commit c64ce96

File tree

9 files changed

+227
-6
lines changed

9 files changed

+227
-6
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import com.google.common.util.concurrent.ListenableFuture;
2828
import com.google.spanner.v1.BatchWriteResponse;
2929
import io.opentelemetry.api.common.Attributes;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.concurrent.atomic.AtomicInteger;
3033
import javax.annotation.Nullable;
3134

3235
class DatabaseClientImpl implements DatabaseClient {
@@ -40,6 +43,8 @@ class DatabaseClientImpl implements DatabaseClient {
4043
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
4144
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
4245
@VisibleForTesting final boolean useMultiplexedSessionForRW;
46+
private final int dbId;
47+
private final AtomicInteger nthRequest;
4348

4449
final boolean useMultiplexedSessionBlindWrite;
4550

@@ -86,6 +91,15 @@ class DatabaseClientImpl implements DatabaseClient {
8691
this.tracer = tracer;
8792
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
8893
this.commonAttributes = commonAttributes;
94+
95+
this.dbId = this.dbIdFromClientId(this.clientId);
96+
this.nthRequest = new AtomicInteger(0);
97+
}
98+
99+
private int dbIdFromClientId(String clientId) {
100+
int i = clientId.indexOf("-");
101+
String strWithValue = clientId.substring(i + 1);
102+
return Integer.parseInt(strWithValue);
89103
}
90104

91105
@VisibleForTesting
@@ -179,8 +193,22 @@ public CommitResponse writeAtLeastOnceWithOptions(
179193
return getMultiplexedSessionDatabaseClient()
180194
.writeAtLeastOnceWithOptions(mutations, options);
181195
}
196+
197+
int nthRequest = this.nextNthRequest();
198+
int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
199+
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(this.dbId, channelId, nthRequest, 0);
200+
182201
return runWithSessionRetry(
183-
session -> session.writeAtLeastOnceWithOptions(mutations, options));
202+
(session) -> {
203+
reqId.incrementAttempt();
204+
// TODO: Update the channelId depending on the session that is inferred.
205+
ArrayList<TransactionOption> allOptions = new ArrayList<>();
206+
allOptions.add(new Options.RequestIdOption(reqId));
207+
allOptions.addAll(Arrays.asList(options));
208+
209+
return session.writeAtLeastOnceWithOptions(
210+
mutations, allOptions.toArray(new TransactionOption[0]));
211+
});
184212
} catch (RuntimeException e) {
185213
span.setStatus(e);
186214
throw e;
@@ -189,6 +217,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
189217
}
190218
}
191219

220+
private int nextNthRequest() {
221+
return this.nthRequest.addAndGet(1);
222+
}
223+
192224
@Override
193225
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
194226
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

+34
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ void appendToOptions(Options options) {
512512
private RpcOrderBy orderBy;
513513
private RpcLockHint lockHint;
514514
private Boolean lastStatement;
515+
private XGoogSpannerRequestId reqId;
515516

516517
// Construction is via factory methods below.
517518
private Options() {}
@@ -568,6 +569,14 @@ String pageToken() {
568569
return pageToken;
569570
}
570571

572+
boolean hasReqId() {
573+
return reqId != null;
574+
}
575+
576+
XGoogSpannerRequestId reqId() {
577+
return reqId;
578+
}
579+
571580
boolean hasFilter() {
572581
return filter != null;
573582
}
@@ -1018,4 +1027,29 @@ public boolean equals(Object o) {
10181027
return o instanceof LastStatementUpdateOption;
10191028
}
10201029
}
1030+
1031+
static final class RequestIdOption extends InternalOption implements TransactionOption {
1032+
private final XGoogSpannerRequestId reqId;
1033+
1034+
RequestIdOption(XGoogSpannerRequestId reqId) {
1035+
this.reqId = reqId;
1036+
}
1037+
1038+
@Override
1039+
void appendToOptions(Options options) {
1040+
options.reqId = this.reqId;
1041+
}
1042+
1043+
@Override
1044+
public int hashCode() {
1045+
return RequestIdOption.class.hashCode();
1046+
}
1047+
1048+
@Override
1049+
public boolean equals(Object o) {
1050+
// TODO: Examine why the precedent for LastStatementUpdateOption
1051+
// does not check against the actual value.
1052+
return o instanceof RequestIdOption;
1053+
}
1054+
}
10211055
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public void run() {
133133
.getTracer()
134134
.getCurrentSpan()
135135
.addAnnotation(String.format("Creating %d sessions", sessionCount));
136+
136137
while (remainingSessionsToCreate > 0) {
137138
try {
138139
sessions = internalBatchCreateSessions(remainingSessionsToCreate, channelHint);
@@ -387,6 +388,10 @@ private List<SessionImpl> internalBatchCreateSessions(
387388
.spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent);
388389
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
389390
try (IScope s = spanner.getTracer().withSpan(span)) {
391+
// TODO: Infer the caller client if possible else create separate
392+
// outside counter for such asynchronous operations and then also
393+
// increment the operations for each asynchronous operation.
394+
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(1, 1, 1, 1);
390395
List<com.google.spanner.v1.Session> sessions =
391396
spanner
392397
.getRpc()
@@ -395,7 +400,7 @@ private List<SessionImpl> internalBatchCreateSessions(
395400
sessionCount,
396401
spanner.getOptions().getDatabaseRole(),
397402
spanner.getOptions().getSessionLabels(),
398-
options);
403+
reqId.withOptions(options));
399404
span.addAnnotation(
400405
String.format(
401406
"Request for %d sessions returned %d sessions", sessionCount, sessions.size()));

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
261261
.setNanos(options.maxCommitDelay().getNano())
262262
.build());
263263
}
264+
264265
RequestOptions commitRequestOptions = getRequestOptions(transactionOptions);
265266

266267
if (commitRequestOptions != null) {
@@ -269,8 +270,14 @@ public CommitResponse writeAtLeastOnceWithOptions(
269270
CommitRequest request = requestBuilder.build();
270271
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
271272
try (IScope s = tracer.withSpan(span)) {
273+
// Inject the request id into the request options.
274+
XGoogSpannerRequestId reqId = options.reqId();
272275
return SpannerRetryHelper.runTxWithRetriesOnAborted(
273-
() -> new CommitResponse(spanner.getRpc().commit(request, getOptions())));
276+
() -> {
277+
reqId.incrementAttempt();
278+
return new CommitResponse(
279+
spanner.getRpc().commit(request, reqId.withOptions(getOptions())));
280+
});
274281
} catch (RuntimeException e) {
275282
span.setStatus(e);
276283
throw e;
@@ -423,7 +430,11 @@ public ApiFuture<Empty> asyncClose() {
423430
public void close() {
424431
ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION);
425432
try (IScope s = tracer.withSpan(span)) {
426-
spanner.getRpc().deleteSession(getName(), getOptions());
433+
// TODO: Infer the caller client if possible else create separate
434+
// outside counter for such asynchronous operations and then also
435+
// increment the operations for each asynchronous operation.
436+
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(1, 2, 1, 1);
437+
spanner.getRpc().deleteSession(getName(), reqId.withOptions(options));
427438
} catch (RuntimeException e) {
428439
span.setStatus(e);
429440
throw e;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java

+19
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717
package com.google.cloud.spanner;
1818

1919
import com.google.api.core.InternalApi;
20+
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2021
import com.google.common.annotations.VisibleForTesting;
22+
import io.grpc.Metadata;
2123
import java.math.BigInteger;
2224
import java.security.SecureRandom;
25+
import java.util.HashMap;
26+
import java.util.Map;
2327
import java.util.Objects;
2428

2529
@InternalApi
@@ -28,6 +32,9 @@ public class XGoogSpannerRequestId {
2832
@VisibleForTesting
2933
static final String RAND_PROCESS_ID = XGoogSpannerRequestId.generateRandProcessId();
3034

35+
public static final Metadata.Key<String> REQUEST_HEADER_KEY =
36+
Metadata.Key.of("x-goog-spanner-request-id", Metadata.ASCII_STRING_MARSHALLER);
37+
3138
@VisibleForTesting
3239
static final long VERSION = 1; // The version of the specification being implemented.
3340

@@ -81,6 +88,18 @@ public boolean equals(Object other) {
8188
&& Objects.equals(this.attempt, otherReqId.attempt);
8289
}
8390

91+
public void incrementAttempt() {
92+
this.attempt++;
93+
}
94+
95+
@SuppressWarnings("unchecked")
96+
public Map withOptions(Map options) {
97+
Map copyOptions = new HashMap<>();
98+
copyOptions.putAll(options);
99+
copyOptions.put(SpannerRpc.Option.REQUEST_ID, this.toString());
100+
return copyOptions;
101+
}
102+
84103
@Override
85104
public int hashCode() {
86105
return Objects.hash(this.nthClientId, this.nthChannelId, this.nthRequest, this.attempt);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

+16
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import com.google.cloud.spanner.SpannerOptions;
7272
import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
7373
import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider;
74+
import com.google.cloud.spanner.XGoogSpannerRequestId;
7475
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
7576
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
7677
import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminCallableFactory;
@@ -88,6 +89,7 @@
8889
import com.google.common.base.Supplier;
8990
import com.google.common.base.Suppliers;
9091
import com.google.common.collect.ImmutableList;
92+
import com.google.common.collect.ImmutableMap;
9193
import com.google.common.collect.ImmutableSet;
9294
import com.google.common.io.Resources;
9395
import com.google.common.util.concurrent.RateLimiter;
@@ -193,6 +195,7 @@
193195
import java.nio.charset.Charset;
194196
import java.nio.charset.StandardCharsets;
195197
import java.time.Duration;
198+
import java.util.Collections;
196199
import java.util.Comparator;
197200
import java.util.HashMap;
198201
import java.util.List;
@@ -2018,6 +2021,8 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20182021
// Set channel affinity in GAX.
20192022
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
20202023
}
2024+
String methodName = method.getFullMethodName();
2025+
context = withRequestId(context, options, methodName);
20212026
}
20222027
if (compressorName != null) {
20232028
// This sets the compressor for Client -> Server.
@@ -2041,6 +2046,8 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20412046
context
20422047
.withStreamWaitTimeoutDuration(waitTimeout)
20432048
.withStreamIdleTimeoutDuration(idleTimeout);
2049+
2050+
// TODO: Infer the x-goog-spanner-request-id header and inject it in accordingly.
20442051
CallContextConfigurator configurator = SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY.get();
20452052
ApiCallContext apiCallContextFromContext = null;
20462053
if (configurator != null) {
@@ -2049,6 +2056,15 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20492056
return (GrpcCallContext) context.merge(apiCallContextFromContext);
20502057
}
20512058

2059+
GrpcCallContext withRequestId(GrpcCallContext context, Map options, String methodName) {
2060+
String reqId = (String) options.get(Option.REQUEST_ID);
2061+
System.out.println("\033[32moptions.reqId: " + reqId + "\033[00m " + methodName);
2062+
Map<String, List<String>> withReqId =
2063+
ImmutableMap.of(
2064+
XGoogSpannerRequestId.REQUEST_HEADER_KEY.name(), Collections.singletonList(reqId));
2065+
return context.withExtraHeaders(withReqId);
2066+
}
2067+
20522068
void registerResponseObserver(SpannerResponseObserver responseObserver) {
20532069
responseObservers.add(responseObserver);
20542070
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@
7878
public interface SpannerRpc extends ServiceRpc {
7979
/** Options passed in {@link SpannerRpc} methods to control how an RPC is issued. */
8080
enum Option {
81-
CHANNEL_HINT("Channel Hint");
81+
CHANNEL_HINT("Channel Hint"),
82+
REQUEST_ID("Request Id");
8283

8384
private final String value;
8485

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

+61-1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
import io.grpc.Metadata;
106106
import io.grpc.MethodDescriptor;
107107
import io.grpc.Server;
108+
import io.grpc.ServerInterceptors;
108109
import io.grpc.Status;
109110
import io.grpc.StatusRuntimeException;
110111
import io.grpc.inprocess.InProcessServerBuilder;
@@ -152,6 +153,7 @@ public class DatabaseClientImplTest {
152153
private static final String DATABASE_NAME =
153154
String.format(
154155
"projects/%s/instances/%s/databases/%s", TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE);
156+
private static XGoogSpannerRequestIdTest.ServerHeaderEnforcer xGoogReqIdInterceptor;
155157
private static MockSpannerServiceImpl mockSpanner;
156158
private static Server server;
157159
private static LocalChannelProvider channelProvider;
@@ -220,13 +222,14 @@ public static void startStaticServer() throws IOException {
220222
StatementResult.query(SELECT1_FROM_TABLE, MockSpannerTestUtil.SELECT1_RESULTSET));
221223
mockSpanner.setBatchWriteResult(BATCH_WRITE_RESPONSES);
222224

225+
xGoogReqIdInterceptor = new XGoogSpannerRequestIdTest.ServerHeaderEnforcer();
223226
executor = Executors.newSingleThreadExecutor();
224227
String uniqueName = InProcessServerBuilder.generateName();
225228
server =
226229
InProcessServerBuilder.forName(uniqueName)
227230
// We need to use a real executor for timeouts to occur.
228231
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
229-
.addService(mockSpanner)
232+
.addService(ServerInterceptors.intercept(mockSpanner, xGoogReqIdInterceptor))
230233
.build()
231234
.start();
232235
channelProvider = LocalChannelProvider.create(uniqueName);
@@ -5168,6 +5171,63 @@ public void testRetryOnResourceExhausted() {
51685171
}
51695172
}
51705173

5174+
@Test
5175+
public void testSelect1HasXGoogRequestIdHeader() {
5176+
SingerInfo info = SingerInfo.newBuilder().setSingerId(1).build();
5177+
Statement statement = Statement.of("SELECT * FROM FOO");
5178+
mockSpanner.putStatementResult(
5179+
StatementResult.query(
5180+
statement,
5181+
com.google.spanner.v1.ResultSet.newBuilder()
5182+
.setMetadata(
5183+
ResultSetMetadata.newBuilder()
5184+
.setRowType(
5185+
StructType.newBuilder()
5186+
.addFields(
5187+
Field.newBuilder()
5188+
.setName("a1")
5189+
.setType(
5190+
Type.newBuilder()
5191+
.setCodeValue(Integer.MAX_VALUE)
5192+
.build())
5193+
.build())
5194+
.addFields(
5195+
Field.newBuilder()
5196+
.setName("b1")
5197+
.setType(
5198+
Type.newBuilder()
5199+
.setCodeValue(Integer.MAX_VALUE)
5200+
.build())
5201+
.build())
5202+
.build())
5203+
.build())
5204+
.addRows(
5205+
ListValue.newBuilder()
5206+
.addValues(
5207+
com.google.protobuf.Value.newBuilder()
5208+
.setListValue(
5209+
ListValue.newBuilder()
5210+
.addValues(
5211+
com.google.protobuf.Value.newBuilder()
5212+
.setNumberValue(6.626)
5213+
.build())
5214+
.addValues(
5215+
com.google.protobuf.Value.newBuilder()
5216+
.setNumberValue(-6.626)
5217+
.build())
5218+
.build())
5219+
.build())
5220+
.build())
5221+
.build()));
5222+
5223+
DatabaseClient client =
5224+
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
5225+
try (ResultSet resultSet = client.singleUse().executeQuery(statement)) {
5226+
assertTrue(resultSet.next());
5227+
assertAsString(ImmutableList.of("6.626", "-6.626"), resultSet, 0);
5228+
}
5229+
}
5230+
51715231
@Test
51725232
public void testSessionPoolExhaustedError_containsStackTraces() {
51735233
assumeFalse(

0 commit comments

Comments
 (0)