Skip to content

Commit d54dda6

Browse files
Backport to branch(3) : Fix validation logic in Consensus Commit to correctly handle Get with index (#2695)
Co-authored-by: Toshihiro Suzuki <[email protected]>
1 parent 64ac80e commit d54dda6

File tree

3 files changed

+350
-16
lines changed

3 files changed

+350
-16
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.common.base.MoreObjects;
88
import com.google.common.collect.ComparisonChain;
99
import com.google.common.collect.Iterators;
10+
import com.scalar.db.api.ConditionSetBuilder;
1011
import com.scalar.db.api.Delete;
1112
import com.scalar.db.api.DistributedStorage;
1213
import com.scalar.db.api.Get;
@@ -43,6 +44,7 @@
4344
import java.util.Set;
4445
import java.util.concurrent.ConcurrentHashMap;
4546
import java.util.concurrent.ConcurrentMap;
47+
import java.util.stream.Collectors;
4648
import javax.annotation.Nonnull;
4749
import javax.annotation.concurrent.Immutable;
4850
import javax.annotation.concurrent.NotThreadSafe;
@@ -489,24 +491,20 @@ void toSerializable(DistributedStorage storage)
489491
// Get set is re-validated to check if there is no anti-dependency
490492
for (Map.Entry<Get, Optional<TransactionResult>> entry : getSet.entrySet()) {
491493
Get get = entry.getKey();
492-
Key key = new Key(get);
493-
if (writeSet.containsKey(key) || deleteSet.containsKey(key)) {
494-
continue;
495-
}
496494

497-
tasks.add(
498-
() -> {
499-
Optional<TransactionResult> originalResult = getSet.get(get);
500-
// Only get the tx_id column because we use only them to compare
501-
get.clearProjections();
502-
get.withProjection(Attribute.ID);
495+
if (ScalarDbUtils.isSecondaryIndexSpecified(get, getTableMetadata(get))) {
496+
// For Get with index
497+
tasks.add(() -> validateGetWithIndexResult(storage, get, entry.getValue()));
498+
} else {
499+
// For other Get
500+
501+
Key key = new Key(get);
502+
if (writeSet.containsKey(key) || deleteSet.containsKey(key)) {
503+
continue;
504+
}
503505

504-
// Check if a read record is not changed
505-
Optional<TransactionResult> latestResult = storage.get(get).map(TransactionResult::new);
506-
if (isChanged(latestResult, originalResult)) {
507-
throwExceptionDueToAntiDependency();
508-
}
509-
});
506+
tasks.add(() -> validateGetResult(storage, get, entry.getValue()));
507+
}
510508
}
511509

512510
parallelExecutor.validate(tasks, getId());
@@ -631,6 +629,48 @@ private void validateScanResults(
631629
}
632630
}
633631

632+
private void validateGetWithIndexResult(
633+
DistributedStorage storage, Get get, Optional<TransactionResult> originalResult)
634+
throws ExecutionException, ValidationConflictException {
635+
assert get.forNamespace().isPresent() && get.forTable().isPresent();
636+
637+
// If this transaction or another transaction inserts records into the index range,
638+
// the Get with index operation may retrieve multiple records, which would result in
639+
// an IllegalArgumentException. Therefore, we use Scan with index instead.
640+
Scan scanWithIndex =
641+
Scan.newBuilder()
642+
.namespace(get.forNamespace().get())
643+
.table(get.forTable().get())
644+
.indexKey(get.getPartitionKey())
645+
.whereOr(
646+
get.getConjunctions().stream()
647+
.map(c -> ConditionSetBuilder.andConditionSet(c.getConditions()).build())
648+
.collect(Collectors.toSet()))
649+
.consistency(get.getConsistency())
650+
.attributes(get.getAttributes())
651+
.build();
652+
653+
LinkedHashMap<Key, TransactionResult> results = new LinkedHashMap<>(1);
654+
originalResult.ifPresent(r -> results.put(new Snapshot.Key(scanWithIndex, r), r));
655+
656+
// Validate the result to check if there is no anti-dependency
657+
validateScanResults(storage, scanWithIndex, results);
658+
}
659+
660+
private void validateGetResult(
661+
DistributedStorage storage, Get get, Optional<TransactionResult> originalResult)
662+
throws ExecutionException, ValidationConflictException {
663+
// Only get the tx_id column because we use only them to compare
664+
get.clearProjections();
665+
get.withProjection(Attribute.ID);
666+
667+
// Check if a read record is not changed
668+
Optional<TransactionResult> latestResult = storage.get(get).map(TransactionResult::new);
669+
if (isChanged(latestResult, originalResult)) {
670+
throwExceptionDueToAntiDependency();
671+
}
672+
}
673+
634674
private TableMetadata getTableMetadata(Operation operation) throws ExecutionException {
635675
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(operation);
636676
if (metadata == null) {

core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public class SnapshotTest {
8989
.addColumn(ANY_NAME_4, DataType.TEXT)
9090
.addPartitionKey(ANY_NAME_1)
9191
.addClusteringKey(ANY_NAME_2)
92+
.addSecondaryIndex(ANY_NAME_4)
9293
.build());
9394

9495
private Snapshot snapshot;
@@ -186,6 +187,15 @@ private Get prepareAnotherGet() {
186187
.forTable(ANY_TABLE_NAME);
187188
}
188189

190+
private Get prepareGetWithIndex() {
191+
Key indexKey = new Key(ANY_NAME_4, ANY_TEXT_1);
192+
return Get.newBuilder()
193+
.namespace(ANY_NAMESPACE_NAME)
194+
.table(ANY_TABLE_NAME)
195+
.indexKey(indexKey)
196+
.build();
197+
}
198+
189199
private Scan prepareScan() {
190200
Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1);
191201
Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2);
@@ -206,6 +216,15 @@ private Scan prepareScanWithLimit(int limit) {
206216
.build();
207217
}
208218

219+
private Scan prepareScanWithIndex() {
220+
Key indexKey = new Key(ANY_NAME_4, ANY_TEXT_1);
221+
return Scan.newBuilder()
222+
.namespace(ANY_NAMESPACE_NAME)
223+
.table(ANY_TABLE_NAME)
224+
.indexKey(indexKey)
225+
.build();
226+
}
227+
209228
private Scan prepareCrossPartitionScan() {
210229
return prepareCrossPartitionScan(ANY_NAMESPACE_NAME, ANY_TABLE_NAME);
211230
}
@@ -1010,6 +1029,83 @@ public void toSerializable_ReadSetExtended_ShouldThrowValidationConflictExceptio
10101029
verify(storage).get(getWithProjections);
10111030
}
10121031

1032+
@Test
1033+
public void toSerializable_GetSetWithGetWithIndex_ShouldProcessWithoutExceptions()
1034+
throws ExecutionException {
1035+
// Arrange
1036+
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
1037+
Get getWithIndex = prepareGetWithIndex();
1038+
TransactionResult txResult = prepareResult(ANY_ID + "x");
1039+
snapshot.putIntoGetSet(getWithIndex, Optional.of(txResult));
1040+
DistributedStorage storage = mock(DistributedStorage.class);
1041+
Scan scanWithIndex =
1042+
prepareScanWithIndex().withProjections(Arrays.asList(Attribute.ID, ANY_NAME_1, ANY_NAME_2));
1043+
Scanner scanner = mock(Scanner.class);
1044+
when(scanner.one()).thenReturn(Optional.of(txResult)).thenReturn(Optional.empty());
1045+
when(storage.scan(scanWithIndex)).thenReturn(scanner);
1046+
1047+
// Act Assert
1048+
assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException();
1049+
1050+
// Assert
1051+
verify(storage).scan(scanWithIndex);
1052+
}
1053+
1054+
@Test
1055+
public void
1056+
toSerializable_GetSetWithGetWithIndex_RecordInserted_ShouldThrowValidationConflictException()
1057+
throws ExecutionException {
1058+
// Arrange
1059+
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
1060+
Get getWithIndex = prepareGetWithIndex();
1061+
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_2);
1062+
TransactionResult result2 = prepareResult(ANY_ID + "xx", ANY_TEXT_1, ANY_TEXT_3);
1063+
snapshot.putIntoGetSet(getWithIndex, Optional.of(result1));
1064+
DistributedStorage storage = mock(DistributedStorage.class);
1065+
Scan scanWithIndex =
1066+
prepareScanWithIndex().withProjections(Arrays.asList(Attribute.ID, ANY_NAME_1, ANY_NAME_2));
1067+
Scanner scanner = mock(Scanner.class);
1068+
when(scanner.one())
1069+
.thenReturn(Optional.of(result1))
1070+
.thenReturn(Optional.of(result2))
1071+
.thenReturn(Optional.empty());
1072+
when(storage.scan(scanWithIndex)).thenReturn(scanner);
1073+
1074+
// Act Assert
1075+
assertThatThrownBy(() -> snapshot.toSerializable(storage))
1076+
.isInstanceOf(ValidationConflictException.class);
1077+
1078+
// Assert
1079+
verify(storage).scan(scanWithIndex);
1080+
}
1081+
1082+
@Test
1083+
public void
1084+
toSerializable_GetSetWithGetWithIndex_RecordInsertedByMySelf_ShouldProcessWithoutExceptions()
1085+
throws ExecutionException {
1086+
// Arrange
1087+
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
1088+
Get getWithIndex = prepareGetWithIndex();
1089+
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_2);
1090+
TransactionResult result2 = prepareResult(ANY_ID, ANY_TEXT_1, ANY_TEXT_3);
1091+
snapshot.putIntoGetSet(getWithIndex, Optional.of(result1));
1092+
DistributedStorage storage = mock(DistributedStorage.class);
1093+
Scan scanWithIndex =
1094+
prepareScanWithIndex().withProjections(Arrays.asList(Attribute.ID, ANY_NAME_1, ANY_NAME_2));
1095+
Scanner scanner = mock(Scanner.class);
1096+
when(scanner.one())
1097+
.thenReturn(Optional.of(result1))
1098+
.thenReturn(Optional.of(result2))
1099+
.thenReturn(Optional.empty());
1100+
when(storage.scan(scanWithIndex)).thenReturn(scanner);
1101+
1102+
// Act Assert
1103+
assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException();
1104+
1105+
// Assert
1106+
verify(storage).scan(scanWithIndex);
1107+
}
1108+
10131109
@Test
10141110
public void toSerializable_ScanSetNotChanged_ShouldProcessWithoutExceptions()
10151111
throws ExecutionException {

0 commit comments

Comments
 (0)