Skip to content

Commit d66e013

Browse files
committed
Wire up more x-goog-spanner-request-id propagations
1 parent c64ce96 commit d66e013

File tree

8 files changed

+94
-31
lines changed

8 files changed

+94
-31
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

+47-13
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import com.google.common.util.concurrent.ListenableFuture;
2828
import com.google.spanner.v1.BatchWriteResponse;
2929
import io.opentelemetry.api.common.Attributes;
30-
import java.util.ArrayList;
3130
import java.util.Arrays;
31+
import java.util.List;
3232
import java.util.concurrent.atomic.AtomicInteger;
3333
import javax.annotation.Nullable;
3434

@@ -169,7 +169,15 @@ public CommitResponse writeWithOptions(
169169
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
170170
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
171171
}
172-
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));
172+
int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
173+
XGoogSpannerRequestId reqId =
174+
XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0);
175+
return runWithSessionRetry(
176+
session -> {
177+
reqId.incrementAttempt();
178+
// TODO: Update the channelId depending on the session that is inferred.
179+
return session.writeWithOptions(mutations, appendReqIdToOptions(reqId, options));
180+
});
173181
} catch (RuntimeException e) {
174182
span.setStatus(e);
175183
throw e;
@@ -194,20 +202,15 @@ public CommitResponse writeAtLeastOnceWithOptions(
194202
.writeAtLeastOnceWithOptions(mutations, options);
195203
}
196204

197-
int nthRequest = this.nextNthRequest();
198205
int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
199-
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(this.dbId, channelId, nthRequest, 0);
200-
206+
XGoogSpannerRequestId reqId =
207+
XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0);
201208
return runWithSessionRetry(
202209
(session) -> {
203210
reqId.incrementAttempt();
204211
// TODO: Update the channelId depending on the session that is inferred.
205-
ArrayList<TransactionOption> allOptions = new ArrayList<>();
206-
allOptions.add(new Options.RequestIdOption(reqId));
207-
allOptions.addAll(Arrays.asList(options));
208-
209212
return session.writeAtLeastOnceWithOptions(
210-
mutations, allOptions.toArray(new TransactionOption[0]));
213+
mutations, appendReqIdToOptions(reqId, options));
211214
});
212215
} catch (RuntimeException e) {
213216
span.setStatus(e);
@@ -217,8 +220,22 @@ public CommitResponse writeAtLeastOnceWithOptions(
217220
}
218221
}
219222

223+
private TransactionOption[] appendReqIdToOptions(
224+
XGoogSpannerRequestId reqId, TransactionOption... options) {
225+
List<TransactionOption> allOptions = Arrays.asList(options);
226+
allOptions.add(new Options.RequestIdOption(reqId));
227+
return allOptions.toArray(new TransactionOption[0]);
228+
}
229+
230+
private UpdateOption[] appendReqIdToOptions(
231+
XGoogSpannerRequestId reqId, UpdateOption... options) {
232+
List<UpdateOption> allOptions = Arrays.asList(options);
233+
allOptions.add(new Options.RequestIdOption(reqId));
234+
return allOptions.toArray(new UpdateOption[0]);
235+
}
236+
220237
private int nextNthRequest() {
221-
return this.nthRequest.addAndGet(1);
238+
return this.nthRequest.incrementAndGet();
222239
}
223240

224241
@Override
@@ -230,7 +247,17 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
230247
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
231248
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
232249
}
233-
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
250+
251+
int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
252+
XGoogSpannerRequestId reqId =
253+
XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0);
254+
255+
return runWithSessionRetry(
256+
(session) -> {
257+
reqId.incrementAttempt();
258+
return session.batchWriteAtLeastOnce(
259+
mutationGroups, appendReqIdToOptions(reqId, options));
260+
});
234261
} catch (RuntimeException e) {
235262
span.setStatus(e);
236263
throw e;
@@ -378,7 +405,14 @@ private long executePartitionedUpdateWithPooledSession(
378405
final Statement stmt, final UpdateOption... options) {
379406
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
380407
try (IScope s = tracer.withSpan(span)) {
381-
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
408+
int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
409+
XGoogSpannerRequestId reqId =
410+
XGoogSpannerRequestId.of(this.dbId, channelId, this.nextNthRequest(), 0);
411+
return runWithSessionRetry(
412+
session -> {
413+
reqId.incrementAttempt();
414+
return session.executePartitionedUpdate(stmt, appendReqIdToOptions(reqId, options));
415+
});
382416
} catch (RuntimeException e) {
383417
span.setStatus(e);
384418
span.end();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,8 @@ public boolean equals(Object o) {
10281028
}
10291029
}
10301030

1031-
static final class RequestIdOption extends InternalOption implements TransactionOption {
1031+
static final class RequestIdOption extends InternalOption
1032+
implements UpdateOption, TransactionOption {
10321033
private final XGoogSpannerRequestId reqId;
10331034

10341035
RequestIdOption(XGoogSpannerRequestId reqId) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

+1
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ protected PartialResultSet computeNext() {
243243
&& (finished || !safeToRetry || !buffer.getLast().getResumeToken().isEmpty())) {
244244
return buffer.pop();
245245
}
246+
246247
try {
247248
if (stream.hasNext()) {
248249
PartialResultSet next = stream.next();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.atomic.AtomicInteger;
3435
import javax.annotation.concurrent.GuardedBy;
3536

3637
/** Client for creating single sessions and batches of sessions. */
@@ -175,6 +176,12 @@ interface SessionConsumer {
175176
private final DatabaseId db;
176177
private final Attributes commonAttributes;
177178

179+
// SessionClient is created long before a DatabaseClientImpl is created,
180+
// as batch sessions are firstly created then later attached to each Client.
181+
private static AtomicInteger NTH_ID = new AtomicInteger(0);
182+
private final int nthId;
183+
private final AtomicInteger nthRequest;
184+
178185
@GuardedBy("this")
179186
private volatile long sessionChannelCounter;
180187

@@ -187,6 +194,8 @@ interface SessionConsumer {
187194
this.executorFactory = executorFactory;
188195
this.executor = executorFactory.get();
189196
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
197+
this.nthId = SessionClient.NTH_ID.incrementAndGet();
198+
this.nthRequest = new AtomicInteger(0);
190199
}
191200

192201
@Override
@@ -207,19 +216,23 @@ SessionImpl createSession() {
207216
// The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE,
208217
// which is also a valid channel hint.
209218
final Map<SpannerRpc.Option, ?> options;
219+
final long channelId;
210220
synchronized (this) {
211221
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
222+
channelId = sessionChannelCounter;
212223
}
213224
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
214225
try (IScope s = spanner.getTracer().withSpan(span)) {
226+
XGoogSpannerRequestId reqId =
227+
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelId, 1);
215228
com.google.spanner.v1.Session session =
216229
spanner
217230
.getRpc()
218231
.createSession(
219232
db.getName(),
220233
spanner.getOptions().getDatabaseRole(),
221234
spanner.getOptions().getSessionLabels(),
222-
options);
235+
reqId.withOptions(options));
223236
SessionReference sessionReference =
224237
new SessionReference(
225238
session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
@@ -388,10 +401,8 @@ private List<SessionImpl> internalBatchCreateSessions(
388401
.spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent);
389402
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
390403
try (IScope s = spanner.getTracer().withSpan(span)) {
391-
// TODO: Infer the caller client if possible else create separate
392-
// outside counter for such asynchronous operations and then also
393-
// increment the operations for each asynchronous operation.
394-
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(1, 1, 1, 1);
404+
XGoogSpannerRequestId reqId =
405+
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1);
395406
List<com.google.spanner.v1.Session> sessions =
396407
spanner
397408
.getRpc()

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,8 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
423423

424424
@Override
425425
public ApiFuture<Empty> asyncClose() {
426-
return spanner.getRpc().asyncDeleteSession(getName(), getOptions());
426+
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(1, 2, 1, 1);
427+
return spanner.getRpc().asyncDeleteSession(getName(), reqId.withOptions(options));
427428
}
428429

429430
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -2047,7 +2047,6 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20472047
.withStreamWaitTimeoutDuration(waitTimeout)
20482048
.withStreamIdleTimeoutDuration(idleTimeout);
20492049

2050-
// TODO: Infer the x-goog-spanner-request-id header and inject it in accordingly.
20512050
CallContextConfigurator configurator = SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY.get();
20522051
ApiCallContext apiCallContextFromContext = null;
20532052
if (configurator != null) {
@@ -2058,6 +2057,10 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20582057

20592058
GrpcCallContext withRequestId(GrpcCallContext context, Map options, String methodName) {
20602059
String reqId = (String) options.get(Option.REQUEST_ID);
2060+
if (reqId == null) {
2061+
return context;
2062+
}
2063+
20612064
System.out.println("\033[32moptions.reqId: " + reqId + "\033[00m " + methodName);
20622065
Map<String, List<String>> withReqId =
20632066
ImmutableMap.of(

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
import java.util.Arrays;
121121
import java.util.Base64;
122122
import java.util.Collections;
123+
import java.util.HashSet;
123124
import java.util.List;
124125
import java.util.Random;
125126
import java.util.Set;
@@ -222,7 +223,9 @@ public static void startStaticServer() throws IOException {
222223
StatementResult.query(SELECT1_FROM_TABLE, MockSpannerTestUtil.SELECT1_RESULTSET));
223224
mockSpanner.setBatchWriteResult(BATCH_WRITE_RESPONSES);
224225

225-
xGoogReqIdInterceptor = new XGoogSpannerRequestIdTest.ServerHeaderEnforcer();
226+
Set<String> checkMethods =
227+
new HashSet(Arrays.asList("google.spanner.v1.Spanner/BatchCreateSessions"));
228+
xGoogReqIdInterceptor = new XGoogSpannerRequestIdTest.ServerHeaderEnforcer(checkMethods);
226229
executor = Executors.newSingleThreadExecutor();
227230
String uniqueName = InProcessServerBuilder.generateName();
228231
server =

google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java

+18-9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.grpc.ServerInterceptor;
2828
import java.util.ArrayList;
2929
import java.util.List;
30+
import java.util.Set;
3031
import java.util.regex.Matcher;
3132
import java.util.regex.Pattern;
3233
import org.junit.Test;
@@ -61,27 +62,35 @@ public void testEnsureHexadecimalFormatForRandProcessID() {
6162

6263
public static class ServerHeaderEnforcer implements ServerInterceptor {
6364
private List<String> gotValues;
65+
private Set<String> checkMethods;
6466

65-
ServerHeaderEnforcer() {
67+
ServerHeaderEnforcer(Set<String> checkMethods) {
6668
this.gotValues = new ArrayList<String>();
69+
this.checkMethods = checkMethods;
6770
}
6871

6972
@Override
7073
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
7174
ServerCall<ReqT, RespT> call,
7275
final Metadata requestHeaders,
7376
ServerCallHandler<ReqT, RespT> next) {
77+
String methodName = call.getMethodDescriptor().getFullMethodName();
78+
if (!this.checkMethods.contains(methodName)) {
79+
return next.startCall(call, requestHeaders);
80+
}
81+
7482
// Firstly assert and validate that at least we've got a requestId.
7583
String gotReqId = requestHeaders.get(XGoogSpannerRequestId.REQUEST_HEADER_KEY);
76-
assertNotNull(gotReqId);
7784
Matcher m = XGoogSpannerRequestIdTest.REGEX_RAND_PROCESS_ID.matcher(gotReqId);
78-
String message =
79-
String.format(
80-
"%s lacks %s in %s",
81-
call.getMethodDescriptor().getFullMethodName(),
82-
XGoogSpannerRequestId.REQUEST_HEADER_KEY.name(),
83-
gotReqId);
84-
System.out.println("\033[32mMessage: " + message + "\033[00m");
85+
if (!m.matches()) {
86+
String message =
87+
String.format(
88+
"%s lacks %s", methodName, XGoogSpannerRequestId.REQUEST_HEADER_KEY.name());
89+
System.out.println("\033[31mMessage: " + message + "\033[00m");
90+
} else {
91+
System.out.println("\033[32mMessage: " + methodName + " has " + gotReqId + "\033[00m");
92+
}
93+
assertNotNull(gotReqId);
8594
assertTrue(m.matches());
8695

8796
this.gotValues.add(gotReqId);

0 commit comments

Comments
 (0)