Skip to content

Revisit behavior of multiple mutations for same record in transaction in Consensus Commit #2340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,8 @@ public enum CoreError implements ScalarDbError {
+ "Primary-key columns must not contain any of the following characters in Cosmos DB: ':', '/', '\\', '#', '?'. Value: %s",
"",
""),
CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED(
Category.USER_ERROR, "0146", "Inserting already-written data is not allowed", "", ""),

//
// Errors for the concurrency error category
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -101,9 +100,9 @@ void read(Snapshot.Key key, Get get) throws CrudException {
// transaction read it first. However, we update it only if a get operation has no
// conjunction or the result exists. This is because we don’t know whether the record
// actually exists or not due to the conjunction.
snapshot.put(key, result);
snapshot.putIntoReadSet(key, result);
}
snapshot.put(get, result); // for re-read and validation
snapshot.putIntoGetSet(get, result); // for re-read and validation
return;
}
throw new UncommittedRecordException(
Expand All @@ -117,7 +116,7 @@ private Optional<Result> createGetResult(Snapshot.Key key, Get get, List<String>
throws CrudException {
TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable());
return snapshot
.mergeResult(key, snapshot.get(get), get.getConjunctions())
.getResult(key, get)
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled));
}

Expand All @@ -139,18 +138,13 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
Scan scan = (Scan) prepareStorageSelection(originalScan);

Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();

Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.get(scan);
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.getResults(scan);
if (resultsInSnapshot.isPresent()) {
for (Entry<Snapshot.Key, TransactionResult> entry : resultsInSnapshot.get().entrySet()) {
snapshot
.mergeResult(entry.getKey(), Optional.of(entry.getValue()))
.ifPresent(result -> results.put(entry.getKey(), result));
}
return createScanResults(scan, originalProjections, results);
return createScanResults(scan, originalProjections, resultsInSnapshot.get());
}

Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();

Scanner scanner = null;
try {
scanner = scanFromStorage(scan);
Expand All @@ -169,9 +163,9 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
// We always update the read set to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.put(key, Optional.of(result));
snapshot.putIntoReadSet(key, Optional.of(result));

snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value));
snapshot.getResult(key).ifPresent(value -> results.put(key, value));
}
} finally {
if (scanner != null) {
Expand All @@ -182,7 +176,7 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
}
}
}
snapshot.put(scan, results);
snapshot.putIntoScanSet(scan, results);

return createScanResults(scan, originalProjections, results);
}
Expand Down Expand Up @@ -213,10 +207,10 @@ public void put(Put put) throws CrudException {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
put, snapshot.getFromReadSet(key).orElse(null));
put, snapshot.getResult(key).orElse(null));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the added getResult() method to use the result that's merged with a mutation for the same record in the write set for conditional updates.

}

snapshot.put(key, put);
snapshot.putIntoWriteSet(key, put);
}

public void delete(Delete delete) throws CrudException {
Expand All @@ -227,10 +221,10 @@ public void delete(Delete delete) throws CrudException {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
delete, snapshot.getFromReadSet(key).orElse(null));
delete, snapshot.getResult(key).orElse(null));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Use the added getResult() method to use the result that's merged with a mutation for the same record in the write set for conditional deletes.

}

snapshot.put(key, delete);
snapshot.putIntoDeleteSet(key, delete);
}

public void readIfImplicitPreReadEnabled() throws CrudException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.scalar.db.api.Get;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Put;
import com.scalar.db.api.PutBuilder;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.ScanAll;
Expand All @@ -30,6 +31,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -113,47 +115,55 @@ Isolation getIsolation() {

// Although this class is not thread-safe, this method is actually thread-safe because the readSet
// is a concurrent map
public void put(Key key, Optional<TransactionResult> result) {
public void putIntoReadSet(Key key, Optional<TransactionResult> result) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the put methods in Snapshot to make them more explicit.

readSet.put(key, result);
}

// Although this class is not thread-safe, this method is actually thread-safe because the getSet
// is a concurrent map
public void put(Get get, Optional<TransactionResult> result) {
public void putIntoGetSet(Get get, Optional<TransactionResult> result) {
getSet.put(get, result);
}

public void put(Scan scan, Map<Key, TransactionResult> results) {
public void putIntoScanSet(Scan scan, Map<Key, TransactionResult> results) {
scanSet.put(scan, results);
}

public void put(Key key, Put put) {
public void putIntoWriteSet(Key key, Put put) {
if (deleteSet.containsKey(key)) {
throw new IllegalArgumentException(
CoreError.CONSENSUS_COMMIT_WRITING_ALREADY_DELETED_DATA_NOT_ALLOWED.buildMessage());
}
if (writeSet.containsKey(key)) {
if (put.isInsertModeEnabled()) {
throw new IllegalArgumentException(
CoreError.CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED.buildMessage());
}
Comment on lines +138 to +141
Copy link
Collaborator Author

@brfrn169 brfrn169 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added validation to ensure that Insert is not executed on an already written record in the same transaction.


// merge the previous put in the write set and the new put
Put originalPut = writeSet.get(key);
put.getColumns().values().forEach(originalPut::withValue);
PutBuilder.BuildableFromExisting putBuilder = Put.newBuilder(originalPut);
put.getColumns().values().forEach(putBuilder::value);

// If the implicit pre-read is enabled for the new put, it should also be enabled for the
// merged put. However, if the previous put is in insert mode, this doesn’t apply. This is
// because, in insert mode, the read set is not used during the preparation phase. Therefore,
// we only need to enable the implicit pre-read if the previous put is not in insert mode
if (put.isImplicitPreReadEnabled() && !originalPut.isInsertModeEnabled()) {
putBuilder.enableImplicitPreRead();
}

writeSet.put(key, putBuilder.build());
Comment on lines +145 to +156
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the merge logic for the write set. We should also merge the implicit pre-read, with one exception. Please see the comments for details.

} else {
writeSet.put(key, put);
}
}

public void put(Key key, Delete delete) {
public void putIntoDeleteSet(Key key, Delete delete) {
writeSet.remove(key);
deleteSet.put(key, delete);
}

public boolean containsKeyInReadSet(Key key) {
return readSet.containsKey(key);
}

public Optional<TransactionResult> getFromReadSet(Key key) {
return readSet.getOrDefault(key, Optional.empty());
}

public List<Put> getPutsInWriteSet() {
return new ArrayList<>(writeSet.values());
}
Expand All @@ -166,7 +176,39 @@ public ReadWriteSets getReadWriteSets() {
return new ReadWriteSets(id, readSet, writeSet.entrySet(), deleteSet.entrySet());
}

public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
public boolean containsKeyInReadSet(Key key) {
return readSet.containsKey(key);
}

public boolean containsKeyInGetSet(Get get) {
return getSet.containsKey(get);
}

public Optional<TransactionResult> getResult(Key key) throws CrudException {
Optional<TransactionResult> result = readSet.getOrDefault(key, Optional.empty());
return mergeResult(key, result);
}

public Optional<TransactionResult> getResult(Key key, Get get) throws CrudException {
Optional<TransactionResult> result = getSet.getOrDefault(get, Optional.empty());
return mergeResult(key, result, get.getConjunctions());
}

public Optional<Map<Snapshot.Key, TransactionResult>> getResults(Scan scan) throws CrudException {
if (!scanSet.containsKey(scan)) {
return Optional.empty();
}

Map<Key, TransactionResult> results = new LinkedHashMap<>();
for (Entry<Snapshot.Key, TransactionResult> entry : scanSet.get(scan).entrySet()) {
mergeResult(entry.getKey(), Optional.of(entry.getValue()))
.ifPresent(result -> results.put(entry.getKey(), result));
}

return Optional.of(results);
}
Comment on lines +196 to +218
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added getResult() and getResults() methods to Snapshot. These methods return results that are merged with mutations in the write set, if necessary. Following this change, we can make the mergeResult() methods private.


private Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
throws CrudException {
if (deleteSet.containsKey(key)) {
return Optional.empty();
Expand All @@ -180,7 +222,7 @@ public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResu
}
}

public Optional<TransactionResult> mergeResult(
private Optional<TransactionResult> mergeResult(
Key key, Optional<TransactionResult> result, Set<Conjunction> conjunctions)
throws CrudException {
return mergeResult(key, result)
Expand Down Expand Up @@ -209,32 +251,6 @@ private TableMetadata getTableMetadata(Key key) throws CrudException {
}
}

private TableMetadata getTableMetadata(Scan scan) throws ExecutionException {
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(scan);
if (metadata == null) {
throw new IllegalArgumentException(
CoreError.TABLE_NOT_FOUND.buildMessage(scan.forFullTableName().get()));
}
return metadata.getTableMetadata();
}

public boolean containsKeyInGetSet(Get get) {
return getSet.containsKey(get);
}

public Optional<TransactionResult> get(Get get) {
// We expect this method is called after putting the result of the get operation in the get set.
assert getSet.containsKey(get);
return getSet.get(get);
}

public Optional<Map<Key, TransactionResult>> get(Scan scan) {
if (scanSet.containsKey(scan)) {
return Optional.ofNullable(scanSet.get(scan));
}
return Optional.empty();
}

public void verify(Scan scan) {
if (isWriteSetOverlappedWith(scan)) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -536,6 +552,15 @@ void toSerializableWithExtraRead(DistributedStorage storage)
parallelExecutor.validate(tasks, getId());
}

private TableMetadata getTableMetadata(Scan scan) throws ExecutionException {
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(scan);
if (metadata == null) {
throw new IllegalArgumentException(
CoreError.TABLE_NOT_FOUND.buildMessage(scan.forFullTableName().get()));
}
return metadata.getTableMetadata();
}

private boolean isChanged(
Optional<TransactionResult> latestResult, Optional<TransactionResult> result) {
if (latestResult.isPresent() != result.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ private Snapshot prepareSnapshotWithDifferentPartitionPut() {
// different partition
Put put1 = preparePut1();
Put put2 = preparePut2();
snapshot.put(new Snapshot.Key(put1), put1);
snapshot.put(new Snapshot.Key(put2), put2);
snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1);
snapshot.putIntoWriteSet(new Snapshot.Key(put2), put2);

return snapshot;
}
Expand All @@ -148,8 +148,8 @@ private Snapshot prepareSnapshotWithSamePartitionPut() {
// same partition
Put put1 = preparePut1();
Put put3 = preparePut3();
snapshot.put(new Snapshot.Key(put1), put1);
snapshot.put(new Snapshot.Key(put3), put3);
snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1);
snapshot.putIntoWriteSet(new Snapshot.Key(put3), put3);

return snapshot;
}
Expand Down
Loading