Skip to content

Commit 367ea8d

Browse files
committed
Revisit behavior of multiple mutations for same record in transaction in Consensus Commit (#2340)
1 parent 62d2dd7 commit 367ea8d

File tree

7 files changed

+1652
-481
lines changed

7 files changed

+1652
-481
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

+4
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,10 @@ public enum CoreError implements ScalarDbError {
654654
+ "Primary-key columns must not contain any of the following characters in Cosmos DB: ':', '/', '\\', '#', '?'. Value: %s",
655655
"",
656656
""),
657+
CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED(
658+
Category.USER_ERROR, "0146", "Inserting already-written data is not allowed", "", ""),
659+
CONSENSUS_COMMIT_DELETING_ALREADY_INSERTED_DATA_NOT_ALLOWED(
660+
Category.USER_ERROR, "0147", "Deleting already-inserted data is not allowed", "", ""),
657661

658662
//
659663
// Errors for the concurrency error category

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

+14-20
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.LinkedHashSet;
2727
import java.util.List;
2828
import java.util.Map;
29-
import java.util.Map.Entry;
3029
import java.util.Optional;
3130
import java.util.stream.Collectors;
3231
import javax.annotation.concurrent.NotThreadSafe;
@@ -101,9 +100,9 @@ void read(Snapshot.Key key, Get get) throws CrudException {
101100
// transaction read it first. However, we update it only if a get operation has no
102101
// conjunction or the result exists. This is because we don’t know whether the record
103102
// actually exists or not due to the conjunction.
104-
snapshot.put(key, result);
103+
snapshot.putIntoReadSet(key, result);
105104
}
106-
snapshot.put(get, result); // for re-read and validation
105+
snapshot.putIntoGetSet(get, result); // for re-read and validation
107106
return;
108107
}
109108
throw new UncommittedRecordException(
@@ -117,7 +116,7 @@ private Optional<Result> createGetResult(Snapshot.Key key, Get get, List<String>
117116
throws CrudException {
118117
TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable());
119118
return snapshot
120-
.mergeResult(key, snapshot.get(get), get.getConjunctions())
119+
.getResult(key, get)
121120
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled));
122121
}
123122

@@ -139,18 +138,13 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
139138
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
140139
Scan scan = (Scan) prepareStorageSelection(originalScan);
141140

142-
Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();
143-
144-
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.get(scan);
141+
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.getResults(scan);
145142
if (resultsInSnapshot.isPresent()) {
146-
for (Entry<Snapshot.Key, TransactionResult> entry : resultsInSnapshot.get().entrySet()) {
147-
snapshot
148-
.mergeResult(entry.getKey(), Optional.of(entry.getValue()))
149-
.ifPresent(result -> results.put(entry.getKey(), result));
150-
}
151-
return createScanResults(scan, originalProjections, results);
143+
return createScanResults(scan, originalProjections, resultsInSnapshot.get());
152144
}
153145

146+
Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();
147+
154148
Scanner scanner = null;
155149
try {
156150
scanner = scanFromStorage(scan);
@@ -169,9 +163,9 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
169163
// We always update the read set to create before image by using the latest record (result)
170164
// because another conflicting transaction might have updated the record after this
171165
// transaction read it first.
172-
snapshot.put(key, Optional.of(result));
166+
snapshot.putIntoReadSet(key, Optional.of(result));
173167

174-
snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value));
168+
snapshot.getResult(key).ifPresent(value -> results.put(key, value));
175169
}
176170
} finally {
177171
if (scanner != null) {
@@ -182,7 +176,7 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
182176
}
183177
}
184178
}
185-
snapshot.put(scan, results);
179+
snapshot.putIntoScanSet(scan, results);
186180

187181
return createScanResults(scan, originalProjections, results);
188182
}
@@ -213,10 +207,10 @@ public void put(Put put) throws CrudException {
213207
read(key, createGet(key));
214208
}
215209
mutationConditionsValidator.checkIfConditionIsSatisfied(
216-
put, snapshot.getFromReadSet(key).orElse(null));
210+
put, snapshot.getResult(key).orElse(null));
217211
}
218212

219-
snapshot.put(key, put);
213+
snapshot.putIntoWriteSet(key, put);
220214
}
221215

222216
public void delete(Delete delete) throws CrudException {
@@ -227,10 +221,10 @@ public void delete(Delete delete) throws CrudException {
227221
read(key, createGet(key));
228222
}
229223
mutationConditionsValidator.checkIfConditionIsSatisfied(
230-
delete, snapshot.getFromReadSet(key).orElse(null));
224+
delete, snapshot.getResult(key).orElse(null));
231225
}
232226

233-
snapshot.put(key, delete);
227+
snapshot.putIntoDeleteSet(key, delete);
234228
}
235229

236230
public void readIfImplicitPreReadEnabled() throws CrudException {

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

+75-41
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.scalar.db.api.Get;
1010
import com.scalar.db.api.Operation;
1111
import com.scalar.db.api.Put;
12+
import com.scalar.db.api.PutBuilder;
1213
import com.scalar.db.api.Result;
1314
import com.scalar.db.api.Scan;
1415
import com.scalar.db.api.ScanAll;
@@ -29,6 +30,7 @@
2930
import java.util.Comparator;
3031
import java.util.HashMap;
3132
import java.util.HashSet;
33+
import java.util.LinkedHashMap;
3234
import java.util.List;
3335
import java.util.Map;
3436
import java.util.Map.Entry;
@@ -112,56 +114,105 @@ Isolation getIsolation() {
112114

113115
// Although this class is not thread-safe, this method is actually thread-safe because the readSet
114116
// is a concurrent map
115-
public void put(Key key, Optional<TransactionResult> result) {
117+
public void putIntoReadSet(Key key, Optional<TransactionResult> result) {
116118
readSet.put(key, result);
117119
}
118120

119121
// Although this class is not thread-safe, this method is actually thread-safe because the getSet
120122
// is a concurrent map
121-
public void put(Get get, Optional<TransactionResult> result) {
123+
public void putIntoGetSet(Get get, Optional<TransactionResult> result) {
122124
getSet.put(get, result);
123125
}
124126

125-
public void put(Scan scan, Map<Key, TransactionResult> results) {
127+
public void putIntoScanSet(Scan scan, Map<Key, TransactionResult> results) {
126128
scanSet.put(scan, results);
127129
}
128130

129-
public void put(Key key, Put put) {
131+
public void putIntoWriteSet(Key key, Put put) {
130132
if (deleteSet.containsKey(key)) {
131133
throw new IllegalArgumentException(
132134
CoreError.CONSENSUS_COMMIT_WRITING_ALREADY_DELETED_DATA_NOT_ALLOWED.buildMessage());
133135
}
134136
if (writeSet.containsKey(key)) {
137+
if (put.isInsertModeEnabled()) {
138+
throw new IllegalArgumentException(
139+
CoreError.CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED.buildMessage());
140+
}
141+
135142
// merge the previous put in the write set and the new put
136143
Put originalPut = writeSet.get(key);
137-
put.getColumns().values().forEach(originalPut::withValue);
144+
PutBuilder.BuildableFromExisting putBuilder = Put.newBuilder(originalPut);
145+
put.getColumns().values().forEach(putBuilder::value);
146+
147+
// If the implicit pre-read is enabled for the new put, it should also be enabled for the
148+
// merged put. However, if the previous put is in insert mode, this doesn’t apply. This is
149+
// because, in insert mode, the read set is not used during the preparation phase. Therefore,
150+
// we only need to enable the implicit pre-read if the previous put is not in insert mode
151+
if (put.isImplicitPreReadEnabled() && !originalPut.isInsertModeEnabled()) {
152+
putBuilder.enableImplicitPreRead();
153+
}
154+
155+
writeSet.put(key, putBuilder.build());
138156
} else {
139157
writeSet.put(key, put);
140158
}
141159
}
142160

143-
public void put(Key key, Delete delete) {
144-
writeSet.remove(key);
161+
public void putIntoDeleteSet(Key key, Delete delete) {
162+
Put put = writeSet.get(key);
163+
if (put != null) {
164+
if (put.isInsertModeEnabled()) {
165+
throw new IllegalArgumentException(
166+
CoreError.CONSENSUS_COMMIT_DELETING_ALREADY_INSERTED_DATA_NOT_ALLOWED.buildMessage());
167+
}
168+
169+
writeSet.remove(key);
170+
}
171+
145172
deleteSet.put(key, delete);
146173
}
147174

175+
public List<Put> getPutsInWriteSet() {
176+
return new ArrayList<>(writeSet.values());
177+
}
178+
179+
public List<Delete> getDeletesInDeleteSet() {
180+
return new ArrayList<>(deleteSet.values());
181+
}
182+
148183
public boolean containsKeyInReadSet(Key key) {
149184
return readSet.containsKey(key);
150185
}
151186

152-
public Optional<TransactionResult> getFromReadSet(Key key) {
153-
return readSet.getOrDefault(key, Optional.empty());
187+
public boolean containsKeyInGetSet(Get get) {
188+
return getSet.containsKey(get);
154189
}
155190

156-
public List<Put> getPutsInWriteSet() {
157-
return new ArrayList<>(writeSet.values());
191+
public Optional<TransactionResult> getResult(Key key) throws CrudException {
192+
Optional<TransactionResult> result = readSet.getOrDefault(key, Optional.empty());
193+
return mergeResult(key, result);
158194
}
159195

160-
public List<Delete> getDeletesInDeleteSet() {
161-
return new ArrayList<>(deleteSet.values());
196+
public Optional<TransactionResult> getResult(Key key, Get get) throws CrudException {
197+
Optional<TransactionResult> result = getSet.getOrDefault(get, Optional.empty());
198+
return mergeResult(key, result, get.getConjunctions());
199+
}
200+
201+
public Optional<Map<Snapshot.Key, TransactionResult>> getResults(Scan scan) throws CrudException {
202+
if (!scanSet.containsKey(scan)) {
203+
return Optional.empty();
204+
}
205+
206+
Map<Key, TransactionResult> results = new LinkedHashMap<>();
207+
for (Entry<Snapshot.Key, TransactionResult> entry : scanSet.get(scan).entrySet()) {
208+
mergeResult(entry.getKey(), Optional.of(entry.getValue()))
209+
.ifPresent(result -> results.put(entry.getKey(), result));
210+
}
211+
212+
return Optional.of(results);
162213
}
163214

164-
public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
215+
private Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
165216
throws CrudException {
166217
if (deleteSet.containsKey(key)) {
167218
return Optional.empty();
@@ -175,7 +226,7 @@ public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResu
175226
}
176227
}
177228

178-
public Optional<TransactionResult> mergeResult(
229+
private Optional<TransactionResult> mergeResult(
179230
Key key, Optional<TransactionResult> result, Set<Conjunction> conjunctions)
180231
throws CrudException {
181232
return mergeResult(key, result)
@@ -204,32 +255,6 @@ private TableMetadata getTableMetadata(Key key) throws CrudException {
204255
}
205256
}
206257

207-
private TableMetadata getTableMetadata(Scan scan) throws ExecutionException {
208-
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(scan);
209-
if (metadata == null) {
210-
throw new IllegalArgumentException(
211-
CoreError.TABLE_NOT_FOUND.buildMessage(scan.forFullTableName().get()));
212-
}
213-
return metadata.getTableMetadata();
214-
}
215-
216-
public boolean containsKeyInGetSet(Get get) {
217-
return getSet.containsKey(get);
218-
}
219-
220-
public Optional<TransactionResult> get(Get get) {
221-
// We expect this method is called after putting the result of the get operation in the get set.
222-
assert getSet.containsKey(get);
223-
return getSet.get(get);
224-
}
225-
226-
public Optional<Map<Key, TransactionResult>> get(Scan scan) {
227-
if (scanSet.containsKey(scan)) {
228-
return Optional.ofNullable(scanSet.get(scan));
229-
}
230-
return Optional.empty();
231-
}
232-
233258
public void verify(Scan scan) {
234259
if (isWriteSetOverlappedWith(scan)) {
235260
throw new IllegalArgumentException(
@@ -531,6 +556,15 @@ void toSerializableWithExtraRead(DistributedStorage storage)
531556
parallelExecutor.validate(tasks, getId());
532557
}
533558

559+
private TableMetadata getTableMetadata(Scan scan) throws ExecutionException {
560+
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(scan);
561+
if (metadata == null) {
562+
throw new IllegalArgumentException(
563+
CoreError.TABLE_NOT_FOUND.buildMessage(scan.forFullTableName().get()));
564+
}
565+
return metadata.getTableMetadata();
566+
}
567+
534568
private boolean isChanged(
535569
Optional<TransactionResult> latestResult, Optional<TransactionResult> result) {
536570
if (latestResult.isPresent() != result.isPresent()) {

core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ private Snapshot prepareSnapshotWithDifferentPartitionPut() {
119119
// different partition
120120
Put put1 = preparePut1();
121121
Put put2 = preparePut2();
122-
snapshot.put(new Snapshot.Key(put1), put1);
123-
snapshot.put(new Snapshot.Key(put2), put2);
122+
snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1);
123+
snapshot.putIntoWriteSet(new Snapshot.Key(put2), put2);
124124

125125
return snapshot;
126126
}
@@ -137,8 +137,8 @@ private Snapshot prepareSnapshotWithSamePartitionPut() {
137137
// same partition
138138
Put put1 = preparePut1();
139139
Put put3 = preparePut3();
140-
snapshot.put(new Snapshot.Key(put1), put1);
141-
snapshot.put(new Snapshot.Key(put3), put3);
140+
snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1);
141+
snapshot.putIntoWriteSet(new Snapshot.Key(put3), put3);
142142

143143
return snapshot;
144144
}

0 commit comments

Comments
 (0)