Skip to content

Optimistic locking for delete scenario with TransactWriteItemsEnhancedRequest #6043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Amazon DynamoDB Enhanced Client",
"contributor": "",
"description": "Optimistic locking while using DynamoDbEnhancedClient - DeleteItem with TransactWriteItemsEnhancedRequest"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package software.amazon.awssdk.enhanced.dynamodb;

import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -192,7 +191,7 @@ public void updateItem_returnValues_all_old() {
Record record = new Record().setId("1").setSort(10);
mappedTable.putItem(record).join();

Record updatedRecord = new Record().setId("1").setSort(10).setValue(11);
Record updatedRecord = new Record().setId("1").setSort(10).setValue(11).setVersion(1);


UpdateItemEnhancedResponse<Record> response = mappedTable.updateItemWithResponse(r -> r.item(updatedRecord)
Expand All @@ -202,14 +201,15 @@ public void updateItem_returnValues_all_old() {
assertThat(response.attributes().getId()).isEqualTo(record.getId());
assertThat(response.attributes().getSort()).isEqualTo(record.getSort());
assertThat(response.attributes().getValue()).isEqualTo(null);
assertThat(response.attributes().getVersion()).isEqualTo(1);
}

@Test
public void updateItem_returnValues_all_new() {
Record record = new Record().setId("1").setSort(10);
mappedTable.putItem(record).join();

Record updatedRecord = new Record().setId("1").setSort(10).setValue(11);
Record updatedRecord = new Record().setId("1").setSort(10).setValue(11).setVersion(1);


UpdateItemEnhancedResponse<Record> response = mappedTable.updateItemWithResponse(r -> r.item(updatedRecord)
Expand All @@ -219,14 +219,15 @@ public void updateItem_returnValues_all_new() {
assertThat(response.attributes().getId()).isEqualTo(updatedRecord.getId());
assertThat(response.attributes().getSort()).isEqualTo(updatedRecord.getSort());
assertThat(response.attributes().getValue()).isEqualTo(updatedRecord.getValue());
assertThat(response.attributes().getVersion()).isEqualTo(updatedRecord.getVersion() + 1);
}

@Test
public void updateItem_returnValues_not_set() {
Record record = new Record().setId("1").setSort(10);
mappedTable.putItem(record).join();

Record updatedRecord = new Record().setId("1").setSort(10).setValue(11);
Record updatedRecord = new Record().setId("1").setSort(10).setValue(11).setVersion(1);


UpdateItemEnhancedResponse<Record> response = mappedTable.updateItemWithResponse(r -> r.item(updatedRecord))
Expand All @@ -235,6 +236,7 @@ public void updateItem_returnValues_not_set() {
assertThat(response.attributes().getId()).isEqualTo(updatedRecord.getId());
assertThat(response.attributes().getSort()).isEqualTo(updatedRecord.getSort());
assertThat(response.attributes().getValue()).isEqualTo(updatedRecord.getValue());
assertThat(response.attributes().getVersion()).isEqualTo(updatedRecord.getVersion() + 1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertThrows;

import org.assertj.core.data.Offset;
import org.junit.After;
Expand All @@ -30,6 +31,7 @@
import software.amazon.awssdk.enhanced.dynamodb.model.PutItemEnhancedRequest;
import software.amazon.awssdk.enhanced.dynamodb.model.PutItemEnhancedResponse;
import software.amazon.awssdk.enhanced.dynamodb.model.Record;
import software.amazon.awssdk.enhanced.dynamodb.model.TransactWriteItemsEnhancedRequest;
import software.amazon.awssdk.enhanced.dynamodb.model.UpdateItemEnhancedRequest;
import software.amazon.awssdk.enhanced.dynamodb.model.UpdateItemEnhancedResponse;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand All @@ -40,6 +42,7 @@
import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity;
import software.amazon.awssdk.services.dynamodb.model.ReturnItemCollectionMetrics;
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;

public class CrudWithResponseIntegrationTest extends DynamoDbEnhancedIntegrationTestBase {

Expand Down Expand Up @@ -218,6 +221,54 @@ public void deleteItem_returnValuesOnConditionCheckFailure_set_returnValuesOnCon
.satisfies(e -> assertThat(((ConditionalCheckFailedException) e).hasItem()).isFalse());
}

@Test
public void deleteItemWithTransactWrite_shouldFailIfVersionMismatch() {
Record originalItem = new Record().setId("123").setSort(10).setStringAttribute("Original Item");
Key recordKey = Key.builder()
.partitionValue(originalItem.getId())
.sortValue(originalItem.getSort())
.build();

mappedTable.putItem(originalItem);

// Retrieve the item and modify it separately
Record modifiedItem = mappedTable.getItem(r -> r.key(recordKey));
modifiedItem.setStringAttribute("Updated Item");

// Update the item, which will increment the version
mappedTable.updateItem(modifiedItem);

// Now attempt to delete the original item using a transaction
TransactWriteItemsEnhancedRequest request = TransactWriteItemsEnhancedRequest.builder()
.addDeleteItem(mappedTable, modifiedItem)
.build();

assertThrows(TransactionCanceledException.class, () -> enhancedClient.transactWriteItems(request));
}

@Test
public void deleteItemWithTransactWrite_shouldSucceedIfVersionMatch() {
Record originalItem = new Record().setId("123").setSort(10).setStringAttribute("Original Item");
Key recordKey = Key.builder()
.partitionValue(originalItem.getId())
.sortValue(originalItem.getSort())
.build();
mappedTable.putItem(originalItem);

// Retrieve the item
Record retrievedItem = mappedTable.getItem(r -> r.key(recordKey));

// Delete the item using a transaction
TransactWriteItemsEnhancedRequest request = TransactWriteItemsEnhancedRequest.builder()
.addDeleteItem(mappedTable, retrievedItem)
.build();

enhancedClient.transactWriteItems(request);

Record deletedItem = mappedTable.getItem(r -> r.key(recordKey));
assertThat(deletedItem).isNull();
}

@Test
public void deleteItem_returnValuesOnConditionCheckFailure_set_returnValuesOnConditionCheckFailureNotNull() {
Record record = new Record().setId("1").setSort(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.enhanced.dynamodb;

import static software.amazon.awssdk.enhanced.dynamodb.extensions.VersionedRecordExtension.AttributeTags.versionAttribute;
import static software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTags.primaryPartitionKey;
import static software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTags.primarySortKey;
import static software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTags.secondaryPartitionKey;
Expand Down Expand Up @@ -73,6 +74,10 @@ protected static DynamoDbAsyncClient createAsyncDynamoDbClient() {
.addAttribute(String.class, a -> a.name("stringAttribute")
.getter(Record::getStringAttribute)
.setter(Record::setStringAttribute))
.addAttribute(Integer.class, a -> a.name("version")
.getter(Record::getVersion)
.setter(Record::setVersion)
.tags(versionAttribute()))
.build();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public static void setup() {
mappedTable = enhancedClient.table(TABLE_NAME, TABLE_SCHEMA);
mappedTable.createTable();
dynamoDbClient.waiter().waitUntilTableExists(r -> r.tableName(TABLE_NAME));
RECORDS.forEach(record -> mappedTable.putItem(r -> r.item(record)));
RECORDS.forEach(record -> record.setVersion(1));
}

@AfterClass
Expand All @@ -68,14 +70,8 @@ public static void teardown() {
}
}

private void insertRecords() {
RECORDS.forEach(record -> mappedTable.putItem(r -> r.item(record)));
}

@Test
public void scan_withoutReturnConsumedCapacity_checksPageCount() {
insertRecords();

Iterator<Page<Record>> results = mappedTable.scan(ScanEnhancedRequest.builder().limit(5).build())
.iterator();
Page<Record> page1 = results.next();
Expand All @@ -97,8 +93,6 @@ public void scan_withoutReturnConsumedCapacity_checksPageCount() {

@Test
public void scan_withReturnConsumedCapacityAndDifferentReadConsistency_checksConsumedCapacity() {
insertRecords();

Iterator<Page<Record>> eventualConsistencyResult =
mappedTable.scan(ScanEnhancedRequest.builder().returnConsumedCapacity(ReturnConsumedCapacity.TOTAL).build())
.iterator();
Expand All @@ -122,8 +116,6 @@ public void scan_withReturnConsumedCapacityAndDifferentReadConsistency_checksCon

@Test
public void query_withoutReturnConsumedCapacity_checksPageCount() {
insertRecords();

Iterator<Page<Record>> results =
mappedTable.query(QueryEnhancedRequest.builder()
.queryConditional(sortBetween(k-> k.partitionValue("id-value").sortValue(2),
Expand Down Expand Up @@ -151,8 +143,6 @@ public void query_withoutReturnConsumedCapacity_checksPageCount() {

@Test
public void query_withReturnConsumedCapacityAndDifferentReadConsistency_checksConsumedCapacity() {
insertRecords();

Iterator<Page<Record>> eventualConsistencyResult =
mappedTable.query(QueryEnhancedRequest.builder()
.queryConditional(sortGreaterThan(k -> k.partitionValue("id-value").sortValue(3)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package software.amazon.awssdk.enhanced.dynamodb.model;

import java.util.Objects;
import software.amazon.awssdk.enhanced.dynamodb.extensions.annotations.DynamoDbVersionAttribute;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;

@DynamoDbBean
public class Record {

private String id;
Expand All @@ -26,6 +29,7 @@ public class Record {
private Integer gsiSort;

private String stringAttribute;
private Integer version;

public String getId() {
return id;
Expand Down Expand Up @@ -81,6 +85,16 @@ public Record setStringAttribute(String stringAttribute) {
return this;
}

@DynamoDbVersionAttribute
public Integer getVersion() {
return version;
}

public Record setVersion(Integer version) {
this.version = version;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -91,11 +105,12 @@ public boolean equals(Object o) {
Objects.equals(value, record.value) &&
Objects.equals(gsiId, record.gsiId) &&
Objects.equals(stringAttribute, record.stringAttribute) &&
Objects.equals(gsiSort, record.gsiSort);
Objects.equals(gsiSort, record.gsiSort) &&
Objects.equals(version, record.version);
}

@Override
public int hashCode() {
return Objects.hash(id, sort, value, gsiId, gsiSort, stringAttribute);
return Objects.hash(id, sort, value, gsiId, gsiSort, stringAttribute, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,15 @@ default WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite conte
default ReadModification afterRead(DynamoDbExtensionContext.AfterRead context) {
return ReadModification.builder().build();
}

/**
* This hook is called just before an operation is going to delete data from the database. The extension that
* implements this method can add a condition to the delete operation.
*
* @param context The {@link DynamoDbExtensionContext.BeforeDelete} context containing the state of the execution.
* @return A {@link Expression} object that can alter the behavior of the delete operation.
*/
default Expression beforeDelete(DynamoDbExtensionContext.BeforeDelete context) {
return Expression.builder().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ public interface BeforeWrite extends Context {
@ThreadSafe
public interface AfterRead extends Context {
}

/**
* The state of the execution when the {@link DynamoDbEnhancedClientExtension#beforeDelete} method is invoked.
*/
@SdkPublicApi
@ThreadSafe
public interface BeforeDelete extends Context {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,40 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex
.build();
}

@Override
public Expression beforeDelete(DynamoDbExtensionContext.BeforeDelete context) {
Optional<String> versionAttributeKey = context.tableMetadata()
.customMetadataObject(CUSTOM_METADATA_KEY, String.class);

if (!versionAttributeKey.isPresent()) {
return Expression.builder().build();
}

String attributeKeyRef = keyRef(versionAttributeKey.get());
Expression condition;
Optional<AttributeValue> existingVersionValue =
Optional.ofNullable(context.items().get(versionAttributeKey.get()));

if (!existingVersionValue.isPresent() || isNullAttributeValue(existingVersionValue.get())) {
throw new IllegalArgumentException("Version attribute is null.");
} else {
if (existingVersionValue.get().n() == null) {
// In this case a non-null version attribute is present, but it's not an N
throw new IllegalArgumentException("Version attribute appears to be the wrong type. N is required.");
}

String existingVersionValueKey = VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER.apply(versionAttributeKey.get());
condition = Expression.builder()
.expression(String.format("%s = %s", attributeKeyRef, existingVersionValueKey))
.expressionNames(Collections.singletonMap(attributeKeyRef, versionAttributeKey.get()))
.expressionValues(Collections.singletonMap(existingVersionValueKey,
existingVersionValue.get()))
.build();
}

return condition;
}

@NotThreadSafe
public static final class Builder {
private Builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,35 @@ public ReadModification afterRead(DynamoDbExtensionContext.AfterRead context) {
.transformedItem(transformedItem)
.build();
}

/**
* Implementation of the {@link DynamoDbEnhancedClientExtension} interface that will call all the chained extensions
* in forward order, passing the results of each one to the next and coalescing the results into a single expression.
* Multiple conditional statements will be separated by the string " AND ".
*
* @param context A {@link DynamoDbExtensionContext.BeforeDelete} context
* @return A single {@link Expression} representing the coalesced results of all the chained extensions.
*/
@Override
public Expression beforeDelete(DynamoDbExtensionContext.BeforeDelete context) {
Expression conditionalExpression = null;

for (DynamoDbEnhancedClientExtension extension : this.extensionChain) {

DynamoDbExtensionContext.BeforeDelete beforeDelete =
DefaultDynamoDbExtensionContext.builder()
.items(context.items())
.operationContext(context.operationContext())
.tableMetadata(context.tableMetadata())
.tableSchema(context.tableSchema())
.build();

Expression additionalConditionExpression = extension.beforeDelete(beforeDelete);

conditionalExpression = mergeConditionalExpressions(conditionalExpression,
additionalConditionExpression);
}

return conditionalExpression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
*/
@SdkInternalApi
public final class DefaultDynamoDbExtensionContext implements DynamoDbExtensionContext.BeforeWrite,
DynamoDbExtensionContext.AfterRead {
DynamoDbExtensionContext.AfterRead,
DynamoDbExtensionContext.BeforeDelete {
private final Map<String, AttributeValue> items;
private final OperationContext operationContext;
private final TableMetadata tableMetadata;
Expand Down
Loading