Skip to content

Commit 3f20979

Browse files
authored
Revisit behavior of multiple mutations for same record in transaction in Consensus Commit (#2340)
1 parent cee15f3 commit 3f20979

File tree

7 files changed

+1665
-494
lines changed

7 files changed

+1665
-494
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

+4
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,10 @@ public enum CoreError implements ScalarDbError {
666666
+ "Primary-key columns must not contain any of the following characters in Cosmos DB: ':', '/', '\\', '#', '?'. Value: %s",
667667
"",
668668
""),
669+
CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED(
670+
Category.USER_ERROR, "0146", "Inserting already-written data is not allowed", "", ""),
671+
CONSENSUS_COMMIT_DELETING_ALREADY_INSERTED_DATA_NOT_ALLOWED(
672+
Category.USER_ERROR, "0147", "Deleting already-inserted data is not allowed", "", ""),
669673

670674
//
671675
// Errors for the concurrency error category

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

+14-20
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.LinkedHashSet;
2727
import java.util.List;
2828
import java.util.Map;
29-
import java.util.Map.Entry;
3029
import java.util.Optional;
3130
import java.util.stream.Collectors;
3231
import javax.annotation.concurrent.NotThreadSafe;
@@ -101,9 +100,9 @@ void read(Snapshot.Key key, Get get) throws CrudException {
101100
// transaction read it first. However, we update it only if a get operation has no
102101
// conjunction or the result exists. This is because we don’t know whether the record
103102
// actually exists or not due to the conjunction.
104-
snapshot.put(key, result);
103+
snapshot.putIntoReadSet(key, result);
105104
}
106-
snapshot.put(get, result); // for re-read and validation
105+
snapshot.putIntoGetSet(get, result); // for re-read and validation
107106
return;
108107
}
109108
throw new UncommittedRecordException(
@@ -117,7 +116,7 @@ private Optional<Result> createGetResult(Snapshot.Key key, Get get, List<String>
117116
throws CrudException {
118117
TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable());
119118
return snapshot
120-
.mergeResult(key, snapshot.get(get), get.getConjunctions())
119+
.getResult(key, get)
121120
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled));
122121
}
123122

@@ -139,18 +138,13 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
139138
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
140139
Scan scan = (Scan) prepareStorageSelection(originalScan);
141140

142-
Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();
143-
144-
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.get(scan);
141+
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.getResults(scan);
145142
if (resultsInSnapshot.isPresent()) {
146-
for (Entry<Snapshot.Key, TransactionResult> entry : resultsInSnapshot.get().entrySet()) {
147-
snapshot
148-
.mergeResult(entry.getKey(), Optional.of(entry.getValue()))
149-
.ifPresent(result -> results.put(entry.getKey(), result));
150-
}
151-
return createScanResults(scan, originalProjections, results);
143+
return createScanResults(scan, originalProjections, resultsInSnapshot.get());
152144
}
153145

146+
Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();
147+
154148
Scanner scanner = null;
155149
try {
156150
scanner = scanFromStorage(scan);
@@ -169,9 +163,9 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
169163
// We always update the read set to create before image by using the latest record (result)
170164
// because another conflicting transaction might have updated the record after this
171165
// transaction read it first.
172-
snapshot.put(key, Optional.of(result));
166+
snapshot.putIntoReadSet(key, Optional.of(result));
173167

174-
snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value));
168+
snapshot.getResult(key).ifPresent(value -> results.put(key, value));
175169
}
176170
} finally {
177171
if (scanner != null) {
@@ -182,7 +176,7 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
182176
}
183177
}
184178
}
185-
snapshot.put(scan, results);
179+
snapshot.putIntoScanSet(scan, results);
186180

187181
return createScanResults(scan, originalProjections, results);
188182
}
@@ -213,10 +207,10 @@ public void put(Put put) throws CrudException {
213207
read(key, createGet(key));
214208
}
215209
mutationConditionsValidator.checkIfConditionIsSatisfied(
216-
put, snapshot.getFromReadSet(key).orElse(null));
210+
put, snapshot.getResult(key).orElse(null));
217211
}
218212

219-
snapshot.put(key, put);
213+
snapshot.putIntoWriteSet(key, put);
220214
}
221215

222216
public void delete(Delete delete) throws CrudException {
@@ -227,10 +221,10 @@ public void delete(Delete delete) throws CrudException {
227221
read(key, createGet(key));
228222
}
229223
mutationConditionsValidator.checkIfConditionIsSatisfied(
230-
delete, snapshot.getFromReadSet(key).orElse(null));
224+
delete, snapshot.getResult(key).orElse(null));
231225
}
232226

233-
snapshot.put(key, delete);
227+
snapshot.putIntoDeleteSet(key, delete);
234228
}
235229

236230
public void readIfImplicitPreReadEnabled() throws CrudException {

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

+76-42
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.scalar.db.api.Get;
1010
import com.scalar.db.api.Operation;
1111
import com.scalar.db.api.Put;
12+
import com.scalar.db.api.PutBuilder;
1213
import com.scalar.db.api.Result;
1314
import com.scalar.db.api.Scan;
1415
import com.scalar.db.api.ScanAll;
@@ -30,6 +31,7 @@
3031
import java.util.Comparator;
3132
import java.util.HashMap;
3233
import java.util.HashSet;
34+
import java.util.LinkedHashMap;
3335
import java.util.List;
3436
import java.util.Map;
3537
import java.util.Map.Entry;
@@ -113,45 +115,62 @@ Isolation getIsolation() {
113115

114116
// Although this class is not thread-safe, this method is actually thread-safe because the readSet
115117
// is a concurrent map
116-
public void put(Key key, Optional<TransactionResult> result) {
118+
public void putIntoReadSet(Key key, Optional<TransactionResult> result) {
117119
readSet.put(key, result);
118120
}
119121

120122
// Although this class is not thread-safe, this method is actually thread-safe because the getSet
121123
// is a concurrent map
122-
public void put(Get get, Optional<TransactionResult> result) {
124+
public void putIntoGetSet(Get get, Optional<TransactionResult> result) {
123125
getSet.put(get, result);
124126
}
125127

126-
public void put(Scan scan, Map<Key, TransactionResult> results) {
128+
public void putIntoScanSet(Scan scan, Map<Key, TransactionResult> results) {
127129
scanSet.put(scan, results);
128130
}
129131

130-
public void put(Key key, Put put) {
132+
public void putIntoWriteSet(Key key, Put put) {
131133
if (deleteSet.containsKey(key)) {
132134
throw new IllegalArgumentException(
133135
CoreError.CONSENSUS_COMMIT_WRITING_ALREADY_DELETED_DATA_NOT_ALLOWED.buildMessage());
134136
}
135137
if (writeSet.containsKey(key)) {
138+
if (put.isInsertModeEnabled()) {
139+
throw new IllegalArgumentException(
140+
CoreError.CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED.buildMessage());
141+
}
142+
136143
// merge the previous put in the write set and the new put
137144
Put originalPut = writeSet.get(key);
138-
put.getColumns().values().forEach(originalPut::withValue);
145+
PutBuilder.BuildableFromExisting putBuilder = Put.newBuilder(originalPut);
146+
put.getColumns().values().forEach(putBuilder::value);
147+
148+
// If the implicit pre-read is enabled for the new put, it should also be enabled for the
149+
// merged put. However, if the previous put is in insert mode, this doesn’t apply. This is
150+
// because, in insert mode, the read set is not used during the preparation phase. Therefore,
151+
// we only need to enable the implicit pre-read if the previous put is not in insert mode
152+
if (put.isImplicitPreReadEnabled() && !originalPut.isInsertModeEnabled()) {
153+
putBuilder.enableImplicitPreRead();
154+
}
155+
156+
writeSet.put(key, putBuilder.build());
139157
} else {
140158
writeSet.put(key, put);
141159
}
142160
}
143161

144-
public void put(Key key, Delete delete) {
145-
writeSet.remove(key);
146-
deleteSet.put(key, delete);
147-
}
162+
public void putIntoDeleteSet(Key key, Delete delete) {
163+
Put put = writeSet.get(key);
164+
if (put != null) {
165+
if (put.isInsertModeEnabled()) {
166+
throw new IllegalArgumentException(
167+
CoreError.CONSENSUS_COMMIT_DELETING_ALREADY_INSERTED_DATA_NOT_ALLOWED.buildMessage());
168+
}
148169

149-
public boolean containsKeyInReadSet(Key key) {
150-
return readSet.containsKey(key);
151-
}
170+
writeSet.remove(key);
171+
}
152172

153-
public Optional<TransactionResult> getFromReadSet(Key key) {
154-
return readSet.getOrDefault(key, Optional.empty());
173+
deleteSet.put(key, delete);
155174
}
156175

157176
public List<Put> getPutsInWriteSet() {
@@ -166,7 +185,39 @@ public ReadWriteSets getReadWriteSets() {
166185
return new ReadWriteSets(id, readSet, writeSet.entrySet(), deleteSet.entrySet());
167186
}
168187

169-
public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
188+
public boolean containsKeyInReadSet(Key key) {
189+
return readSet.containsKey(key);
190+
}
191+
192+
public boolean containsKeyInGetSet(Get get) {
193+
return getSet.containsKey(get);
194+
}
195+
196+
public Optional<TransactionResult> getResult(Key key) throws CrudException {
197+
Optional<TransactionResult> result = readSet.getOrDefault(key, Optional.empty());
198+
return mergeResult(key, result);
199+
}
200+
201+
public Optional<TransactionResult> getResult(Key key, Get get) throws CrudException {
202+
Optional<TransactionResult> result = getSet.getOrDefault(get, Optional.empty());
203+
return mergeResult(key, result, get.getConjunctions());
204+
}
205+
206+
public Optional<Map<Snapshot.Key, TransactionResult>> getResults(Scan scan) throws CrudException {
207+
if (!scanSet.containsKey(scan)) {
208+
return Optional.empty();
209+
}
210+
211+
Map<Key, TransactionResult> results = new LinkedHashMap<>();
212+
for (Entry<Snapshot.Key, TransactionResult> entry : scanSet.get(scan).entrySet()) {
213+
mergeResult(entry.getKey(), Optional.of(entry.getValue()))
214+
.ifPresent(result -> results.put(entry.getKey(), result));
215+
}
216+
217+
return Optional.of(results);
218+
}
219+
220+
private Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
170221
throws CrudException {
171222
if (deleteSet.containsKey(key)) {
172223
return Optional.empty();
@@ -180,7 +231,7 @@ public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResu
180231
}
181232
}
182233

183-
public Optional<TransactionResult> mergeResult(
234+
private Optional<TransactionResult> mergeResult(
184235
Key key, Optional<TransactionResult> result, Set<Conjunction> conjunctions)
185236
throws CrudException {
186237
return mergeResult(key, result)
@@ -209,32 +260,6 @@ private TableMetadata getTableMetadata(Key key) throws CrudException {
209260
}
210261
}
211262

212-
private TableMetadata getTableMetadata(Scan scan) throws ExecutionException {
213-
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(scan);
214-
if (metadata == null) {
215-
throw new IllegalArgumentException(
216-
CoreError.TABLE_NOT_FOUND.buildMessage(scan.forFullTableName().get()));
217-
}
218-
return metadata.getTableMetadata();
219-
}
220-
221-
public boolean containsKeyInGetSet(Get get) {
222-
return getSet.containsKey(get);
223-
}
224-
225-
public Optional<TransactionResult> get(Get get) {
226-
// We expect this method is called after putting the result of the get operation in the get set.
227-
assert getSet.containsKey(get);
228-
return getSet.get(get);
229-
}
230-
231-
public Optional<Map<Key, TransactionResult>> get(Scan scan) {
232-
if (scanSet.containsKey(scan)) {
233-
return Optional.ofNullable(scanSet.get(scan));
234-
}
235-
return Optional.empty();
236-
}
237-
238263
public void verify(Scan scan) {
239264
if (isWriteSetOverlappedWith(scan)) {
240265
throw new IllegalArgumentException(
@@ -536,6 +561,15 @@ void toSerializableWithExtraRead(DistributedStorage storage)
536561
parallelExecutor.validate(tasks, getId());
537562
}
538563

564+
private TableMetadata getTableMetadata(Scan scan) throws ExecutionException {
565+
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(scan);
566+
if (metadata == null) {
567+
throw new IllegalArgumentException(
568+
CoreError.TABLE_NOT_FOUND.buildMessage(scan.forFullTableName().get()));
569+
}
570+
return metadata.getTableMetadata();
571+
}
572+
539573
private boolean isChanged(
540574
Optional<TransactionResult> latestResult, Optional<TransactionResult> result) {
541575
if (latestResult.isPresent() != result.isPresent()) {

core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ private Snapshot prepareSnapshotWithDifferentPartitionPut() {
130130
// different partition
131131
Put put1 = preparePut1();
132132
Put put2 = preparePut2();
133-
snapshot.put(new Snapshot.Key(put1), put1);
134-
snapshot.put(new Snapshot.Key(put2), put2);
133+
snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1);
134+
snapshot.putIntoWriteSet(new Snapshot.Key(put2), put2);
135135

136136
return snapshot;
137137
}
@@ -148,8 +148,8 @@ private Snapshot prepareSnapshotWithSamePartitionPut() {
148148
// same partition
149149
Put put1 = preparePut1();
150150
Put put3 = preparePut3();
151-
snapshot.put(new Snapshot.Key(put1), put1);
152-
snapshot.put(new Snapshot.Key(put3), put3);
151+
snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1);
152+
snapshot.putIntoWriteSet(new Snapshot.Key(put3), put3);
153153

154154
return snapshot;
155155
}

0 commit comments

Comments
 (0)