@@ -1285,55 +1285,35 @@ private interface VisitCallback {
1285
1285
default void onStart (JsonResponse response , boolean fullyApplied ) throws IOException { }
1286
1286
1287
1287
/** Called for every document or removal received from backend visitors—must call the ack for these to proceed. */
1288
- default void onDocument (JsonResponse response , Document document , DocumentId removeId , long persistedTimestamp , Runnable ack , Consumer <String > onError ) { }
1288
+ default void onDocument (JsonResponse response , Document document , DocumentId removeId , Runnable ack , Consumer <String > onError ) { }
1289
1289
1290
1290
/** Called at the end of response rendering, before generic status data is written. Called from a dedicated thread pool. */
1291
1291
default void onEnd (JsonResponse response ) throws IOException { }
1292
1292
}
1293
1293
1294
- @ FunctionalInterface
1295
- private interface VisitProcessingCallback {
1296
- Result apply (DocumentId id , long persistedTimestamp , DocumentOperationParameters params );
1297
- }
1298
-
1299
1294
private void visitAndDelete (HttpRequest request , VisitorParameters parameters , ResponseHandler handler ,
1300
1295
TestAndSetCondition condition , String route ) {
1301
- visitAndProcess (request , parameters , true , handler , route , (id , timestamp , operationParameters ) -> {
1296
+ visitAndProcess (request , parameters , true , handler , route , (id , operationParameters ) -> {
1302
1297
DocumentRemove remove = new DocumentRemove (id );
1303
- // If the backend provided a persisted timestamp, we set a condition that specifies _both_ the
1304
- // original selection and the timestamp. If the backend supports timestamp-predicated TaS operations,
1305
- // it will ignore the selection entirely and only look at the timestamp. If it does not, it will fall
1306
- // back to evaluating the selection, which preserves legacy behavior.
1307
- if (timestamp != 0 ) {
1308
- remove .setCondition (TestAndSetCondition .ofRequiredTimestampWithSelectionFallback (
1309
- timestamp , condition .getSelection ()));
1310
- } else {
1311
- remove .setCondition (condition );
1312
- }
1298
+ remove .setCondition (condition );
1313
1299
return asyncSession .remove (remove , operationParameters );
1314
1300
});
1315
1301
}
1316
1302
1317
1303
private void visitAndUpdate (HttpRequest request , VisitorParameters parameters , boolean fullyApplied ,
1318
1304
ResponseHandler handler , DocumentUpdate protoUpdate , String route ) {
1319
- visitAndProcess (request , parameters , fullyApplied , handler , route , (id , timestamp , operationParameters ) -> {
1320
- DocumentUpdate update = new DocumentUpdate (protoUpdate );
1321
- // See `visitAndDelete()` for rationale for sending down a timestamp _and_ the original condition.
1322
- if (timestamp != 0 ) {
1323
- update .setCondition (TestAndSetCondition .ofRequiredTimestampWithSelectionFallback (
1324
- timestamp , protoUpdate .getCondition ().getSelection ()));
1325
- } // else: use condition already set from protoUpdate
1326
- update .setId (id );
1327
- return asyncSession .update (update , operationParameters );
1305
+ visitAndProcess (request , parameters , fullyApplied , handler , route , (id , operationParameters ) -> {
1306
+ DocumentUpdate update = new DocumentUpdate (protoUpdate );
1307
+ update .setId (id );
1308
+ return asyncSession .update (update , operationParameters );
1328
1309
});
1329
1310
}
1330
1311
1331
1312
private void visitAndProcess (HttpRequest request , VisitorParameters parameters , boolean fullyApplied ,
1332
1313
ResponseHandler handler ,
1333
- String route , VisitProcessingCallback operation ) {
1314
+ String route , BiFunction < DocumentId , DocumentOperationParameters , Result > operation ) {
1334
1315
visit (request , parameters , false , fullyApplied , handler , new VisitCallback () {
1335
- @ Override public void onDocument (JsonResponse response , Document document , DocumentId removeId ,
1336
- long persistedTimestamp , Runnable ack , Consumer <String > onError ) {
1316
+ @ Override public void onDocument (JsonResponse response , Document document , DocumentId removeId , Runnable ack , Consumer <String > onError ) {
1337
1317
DocumentOperationParameters operationParameters = parameters ().withRoute (route )
1338
1318
.withResponseHandler (operationResponse -> {
1339
1319
outstanding .decrementAndGet ();
@@ -1352,7 +1332,7 @@ private void visitAndProcess(HttpRequest request, VisitorParameters parameters,
1352
1332
}
1353
1333
});
1354
1334
visitOperations .offer (() -> {
1355
- Result result = operation .apply (document .getId (), persistedTimestamp , operationParameters );
1335
+ Result result = operation .apply (document .getId (), operationParameters );
1356
1336
if (result .type () == Result .ResultType .TRANSIENT_ERROR )
1357
1337
return false ;
1358
1338
@@ -1377,8 +1357,7 @@ private void visitAndWrite(HttpRequest request, VisitorParameters parameters, Re
1377
1357
1378
1358
response .writeDocumentsArrayStart ();
1379
1359
}
1380
- @ Override public void onDocument (JsonResponse response , Document document , DocumentId removeId ,
1381
- long persistedTimestamp , Runnable ack , Consumer <String > onError ) {
1360
+ @ Override public void onDocument (JsonResponse response , Document document , DocumentId removeId , Runnable ack , Consumer <String > onError ) {
1382
1361
try {
1383
1362
if (streamed ) {
1384
1363
CompletionHandler completion = new CompletionHandler () {
@@ -1478,21 +1457,13 @@ private void visit(HttpRequest request, VisitorParameters parameters, boolean st
1478
1457
@ Override public void onMessage (Message m , AckToken token ) {
1479
1458
Document document = null ;
1480
1459
DocumentId removeId = null ;
1481
- long persistedTimestamp = 0 ;
1482
- if (m instanceof PutDocumentMessage put ) {
1483
- document = put .getDocumentPut ().getDocument ();
1484
- persistedTimestamp = put .getPersistedTimestamp ();
1485
- } else if (parameters .visitRemoves () && m instanceof RemoveDocumentMessage remove ) {
1486
- removeId = remove .getDocumentId ();
1487
- persistedTimestamp = remove .getPersistedTimestamp ();
1488
- } else {
1489
- throw new UnsupportedOperationException ("Got unsupported message type: " + m .getClass ().getName ());
1490
- }
1460
+ if (m instanceof PutDocumentMessage put ) document = put .getDocumentPut ().getDocument ();
1461
+ else if (parameters .visitRemoves () && m instanceof RemoveDocumentMessage remove ) removeId = remove .getDocumentId ();
1462
+ else throw new UnsupportedOperationException ("Got unsupported message type: " + m .getClass ().getName ());
1491
1463
locallyReceivedDocCount .getAndAdd (1 );
1492
1464
callback .onDocument (response ,
1493
1465
document ,
1494
1466
removeId ,
1495
- persistedTimestamp ,
1496
1467
() -> ack (token ),
1497
1468
errorMessage -> {
1498
1469
error .set (errorMessage );
0 commit comments