@@ -169,7 +169,15 @@ public CommitResponse writeWithOptions(
169
169
if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
170
170
return getMultiplexedSessionDatabaseClient ().writeWithOptions (mutations , options );
171
171
}
172
- return runWithSessionRetry (session -> session .writeWithOptions (mutations , options ));
172
+ int channelId = 1 ; /* TODO: infer the channelId from the gRPC channel of the session */
173
+ XGoogSpannerRequestId reqId =
174
+ XGoogSpannerRequestId .of (this .dbId , channelId , this .nextNthRequest (), 0 );
175
+ return runWithSessionRetry (
176
+ session -> {
177
+ reqId .incrementAttempt ();
178
+ // TODO: Update the channelId depending on the session that is inferred.
179
+ return session .writeWithOptions (mutations , appendReqIdToOptions (reqId , options ));
180
+ });
173
181
} catch (RuntimeException e ) {
174
182
span .setStatus (e );
175
183
throw e ;
@@ -194,20 +202,15 @@ public CommitResponse writeAtLeastOnceWithOptions(
194
202
.writeAtLeastOnceWithOptions (mutations , options );
195
203
}
196
204
197
- int nthRequest = this .nextNthRequest ();
198
205
int channelId = 1 ; /* TODO: infer the channelId from the gRPC channel of the session */
199
- XGoogSpannerRequestId reqId = XGoogSpannerRequestId . of ( this . dbId , channelId , nthRequest , 0 );
200
-
206
+ XGoogSpannerRequestId reqId =
207
+ XGoogSpannerRequestId . of ( this . dbId , channelId , this . nextNthRequest (), 0 );
201
208
return runWithSessionRetry (
202
209
(session ) -> {
203
210
reqId .incrementAttempt ();
204
211
// TODO: Update the channelId depending on the session that is inferred.
205
- ArrayList <TransactionOption > allOptions = new ArrayList <>();
206
- allOptions .add (new Options .RequestIdOption (reqId ));
207
- allOptions .addAll (Arrays .asList (options ));
208
-
209
212
return session .writeAtLeastOnceWithOptions (
210
- mutations , allOptions . toArray ( new TransactionOption [ 0 ] ));
213
+ mutations , appendReqIdToOptions ( reqId , options ));
211
214
});
212
215
} catch (RuntimeException e ) {
213
216
span .setStatus (e );
@@ -217,6 +220,24 @@ public CommitResponse writeAtLeastOnceWithOptions(
217
220
}
218
221
}
219
222
223
+ private TransactionOption [] appendReqIdToOptions (
224
+ XGoogSpannerRequestId reqId , TransactionOption ... options ) {
225
+ ArrayList <TransactionOption > allOptions = new ArrayList <>();
226
+ allOptions .add (new Options .RequestIdOption (reqId ));
227
+ allOptions .addAll (Arrays .asList (options ));
228
+
229
+ return allOptions .toArray (new TransactionOption [0 ]);
230
+ }
231
+
232
+ private UpdateOption [] appendReqIdToOptions (
233
+ XGoogSpannerRequestId reqId , UpdateOption ... options ) {
234
+ ArrayList <UpdateOption > allOptions = new ArrayList <>();
235
+ allOptions .add (new Options .RequestIdOption (reqId ));
236
+ allOptions .addAll (Arrays .asList (options ));
237
+
238
+ return allOptions .toArray (new UpdateOption [0 ]);
239
+ }
240
+
220
241
private int nextNthRequest () {
221
242
return this .nthRequest .addAndGet (1 );
222
243
}
@@ -230,7 +251,17 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
230
251
if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
231
252
return getMultiplexedSessionDatabaseClient ().batchWriteAtLeastOnce (mutationGroups , options );
232
253
}
233
- return runWithSessionRetry (session -> session .batchWriteAtLeastOnce (mutationGroups , options ));
254
+
255
+ int channelId = 1 ; /* TODO: infer the channelId from the gRPC channel of the session */
256
+ XGoogSpannerRequestId reqId =
257
+ XGoogSpannerRequestId .of (this .dbId , channelId , this .nextNthRequest (), 0 );
258
+
259
+ return runWithSessionRetry (
260
+ (session ) -> {
261
+ reqId .incrementAttempt ();
262
+ return session .batchWriteAtLeastOnce (
263
+ mutationGroups , appendReqIdToOptions (reqId , options ));
264
+ });
234
265
} catch (RuntimeException e ) {
235
266
span .setStatus (e );
236
267
throw e ;
@@ -378,7 +409,14 @@ private long executePartitionedUpdateWithPooledSession(
378
409
final Statement stmt , final UpdateOption ... options ) {
379
410
ISpan span = tracer .spanBuilder (PARTITION_DML_TRANSACTION , commonAttributes );
380
411
try (IScope s = tracer .withSpan (span )) {
381
- return runWithSessionRetry (session -> session .executePartitionedUpdate (stmt , options ));
412
+ int channelId = 1 ; /* TODO: infer the channelId from the gRPC channel of the session */
413
+ XGoogSpannerRequestId reqId =
414
+ XGoogSpannerRequestId .of (this .dbId , channelId , this .nextNthRequest (), 0 );
415
+ return runWithSessionRetry (
416
+ session -> {
417
+ reqId .incrementAttempt ();
418
+ return session .executePartitionedUpdate (stmt , appendReqIdToOptions (reqId , options ));
419
+ });
382
420
} catch (RuntimeException e ) {
383
421
span .setStatus (e );
384
422
span .end ();
0 commit comments