Skip to content

Commit

Permalink
Fix backwards compatibility issues with Obj.referenced (projectnessie…
Browse files Browse the repository at this point in the history
…#10218)

While this is a real bug fix, the underlying issue does not affect data correctness.

`Obj.referenced()` was introduced to track when a particular `Obj` was (last) written. This information is crucial when purging unreferenced data in Nessie's persistence backend database.

The current code uses the sentinel value `0` for `Obj.reference()` for accidentally _two_ scenarios: to indicate that the value is not set and needs to be written with the actual timestamp and it is also `0` when old rows that do not have the `referenced` column set (aka `NULL`).

This change updates the code to introduce `-1` for `referenced` as a sentinel for "absent" (NULL).

There is also a bug in the implementations that prevents the "purge unreferenced data" to actually work for rows that do not have a value in the `referenced` column. This change fixes this. Unfortunately not all databases properly (DynamoDB and BigTable) support checking for the absence of a value.
  • Loading branch information
snazy authored Jan 21, 2025
1 parent f8da3d6 commit e129315
Show file tree
Hide file tree
Showing 16 changed files with 272 additions and 70 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ as necessary. Empty sections will not end in the release notes.

### Fixes

- Fix an issue that prevents the Nessie Server Admin tool to purge unreferenced data in the backend
database, for data being written before Nessie version 0.101.0.

### Commits

## [0.101.3] Release (2024-12-18)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,18 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
Filter condition =
FILTERS
.chain()
.filter(FILTERS.qualifier().exactMatch(QUALIFIER_OBJ_REFERENCED))
.filter(FILTERS.value().exactMatch(copyFromUtf8(Long.toString(obj.referenced()))));
Filter condition;
if (obj.referenced() != -1L) {
condition =
FILTERS
.chain()
.filter(FILTERS.qualifier().exactMatch(QUALIFIER_OBJ_REFERENCED))
.filter(FILTERS.value().exactMatch(copyFromUtf8(Long.toString(obj.referenced()))));
} else {
// We take a risk here in case the given object does _not_ have a referenced() value (old
// object). It's sadly not possible to check for the _absence_ of a cell.
condition = FILTERS.pass();
}

ConditionalRowMutation conditionalRowMutation =
conditionalRowMutation(obj, condition, Mutation.create().deleteRow());
Expand Down Expand Up @@ -653,12 +660,15 @@ static <M extends MutationApi<M>> M objToMutation(
}
mutation
.setCell(FAMILY_OBJS, QUALIFIER_OBJS, CELL_TIMESTAMP, unsafeWrap(serialized))
.setCell(FAMILY_OBJS, QUALIFIER_OBJ_TYPE, CELL_TIMESTAMP, objTypeValue)
.setCell(
FAMILY_OBJS,
QUALIFIER_OBJ_REFERENCED,
CELL_TIMESTAMP,
copyFromUtf8(Long.toString(referenced)));
.setCell(FAMILY_OBJS, QUALIFIER_OBJ_TYPE, CELL_TIMESTAMP, objTypeValue);
if (obj.referenced() != -1L) {
// -1 is a sentinel for AbstractBasePersistTests.deleteWithReferenced()
mutation.setCell(
FAMILY_OBJS,
QUALIFIER_OBJ_REFERENCED,
CELL_TIMESTAMP,
copyFromUtf8(Long.toString(referenced)));
}
UpdateableObj.extractVersionToken(obj)
.map(ByteString::copyFromUtf8)
.ifPresent(bs -> mutation.setCell(FAMILY_OBJS, QUALIFIER_OBJ_VERS, CELL_TIMESTAMP, bs));
Expand Down Expand Up @@ -756,7 +766,7 @@ private Obj objFromRow(Row row) {
List<RowCell> objReferenced = row.getCells(FAMILY_OBJS, QUALIFIER_OBJ_REFERENCED);
long referenced =
objReferenced.isEmpty()
? 0L
? -1L
: Long.parseLong(objReferenced.get(0).getValue().toStringUtf8());
List<RowCell> objCells = row.getCells(FAMILY_OBJS, QUALIFIER_OBJS);
ByteBuffer obj = objCells.get(0).getValue().asReadOnlyByteBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ public <T extends Obj> T[] fetchTypedObjsIfExist(
}
ObjId id = deserializeObjId(row.getString(COL_OBJ_ID.name()));
String versionToken = row.getString(COL_OBJ_VERS.name());
long referenced = row.getLong(COL_OBJ_REFERENCED.name());
String colReferenced = COL_OBJ_REFERENCED.name();
long referenced = row.isNull(colReferenced) ? -1 : row.getLong(colReferenced);
@SuppressWarnings("unchecked")
T typed =
(T)
Expand Down Expand Up @@ -345,13 +346,21 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
long referenced = obj.referenced();
BoundStatement stmt =
backend.buildStatement(
DELETE_OBJ_REFERENCED,
false,
config.repositoryId(),
serializeObjId(obj.id()),
obj.referenced());
referenced != -1L
? backend.buildStatement(
DELETE_OBJ_REFERENCED,
false,
config.repositoryId(),
serializeObjId(obj.id()),
referenced)
// We take a risk here in case the given object does _not_ have a referenced() value
// (old object).
// Cassandra's conditional DELETE ... IF doesn't allow us to use "IF col IS NULL" or "IF
// (col = 0 OR col IS NULL)".
: backend.buildStatement(
DELETE_OBJ, false, config.repositoryId(), serializeObjId(obj.id()));
return backend.executeCas(stmt);
}

Expand Down Expand Up @@ -391,8 +400,13 @@ public boolean updateConditional(@Nonnull UpdateableObj expected, @Nonnull Updat
.setString(COL_OBJ_ID.name(), serializeObjId(id))
.setString(COL_OBJ_TYPE.name() + EXPECTED_SUFFIX, type.name())
.setString(COL_OBJ_VERS.name() + EXPECTED_SUFFIX, expectedVersion)
.setString(COL_OBJ_VERS.name(), newVersion)
.setLong(COL_OBJ_REFERENCED.name(), referenced);
.setString(COL_OBJ_VERS.name(), newVersion);
if (newValue.referenced() != -1L) {
// -1 is a sentinel for AbstractBasePersistTests.deleteWithReferenced()
stmt = stmt.setLong(COL_OBJ_REFERENCED.name(), referenced);
} else {
stmt = stmt.setToNull(COL_OBJ_REFERENCED.name());
}

serializer.serialize(
newValue.withReferenced(referenced),
Expand Down Expand Up @@ -531,9 +545,14 @@ private <R> R writeSingleObj(
.newBoundStatementBuilder(serializer.insertCql(upsert), upsert)
.setString(COL_REPO_ID.name(), config.repositoryId())
.setString(COL_OBJ_ID.name(), serializeObjId(id))
.setLong(COL_OBJ_REFERENCED.name(), referenced)
.setString(COL_OBJ_TYPE.name(), type.name())
.setString(COL_OBJ_VERS.name(), versionToken);
if (obj.referenced() != -1L) {
// -1 is a sentinel for AbstractBasePersistTests.deleteWithReferenced()
stmt = stmt.setLong(COL_OBJ_REFERENCED.name(), referenced);
} else {
stmt = stmt.setToNull(COL_OBJ_REFERENCED.name());
}

serializer.serialize(
obj,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ public <T extends Obj> T[] fetchTypedObjsIfExist(
ObjId id = deserializeObjId(row.getByteBuffer(COL_OBJ_ID.name()));
String versionToken = row.getString(COL_OBJ_VERS.name());
ByteBuffer serialized = row.getByteBuffer(COL_OBJ_VALUE.name());
long referenced = row.getLong(COL_OBJ_REFERENCED.name());
String colReferenced = COL_OBJ_REFERENCED.name();
long referenced = row.isNull(colReferenced) ? -1 : row.getLong(colReferenced);
return typeClass.cast(deserializeObj(id, referenced, serialized, versionToken));
};

Expand Down Expand Up @@ -345,13 +346,21 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
var referenced = obj.referenced();
BoundStatement stmt =
backend.buildStatement(
DELETE_OBJ_REFERENCED,
false,
config.repositoryId(),
serializeObjId(obj.id()),
obj.referenced());
referenced != -1L
? backend.buildStatement(
DELETE_OBJ_REFERENCED,
false,
config.repositoryId(),
serializeObjId(obj.id()),
referenced)
// We take a risk here in case the given object does _not_ have a referenced() value
// (old object).
// Cassandra's conditional DELETE ... IF doesn't allow us to use "IF col IS NULL" or "IF
// (col = 0 OR col IS NULL)".
: backend.buildStatement(
DELETE_OBJ, false, config.repositoryId(), serializeObjId(obj.id()));
return backend.executeCas(stmt);
}

Expand Down Expand Up @@ -397,8 +406,13 @@ public boolean updateConditional(@Nonnull UpdateableObj expected, @Nonnull Updat
.setString(COL_OBJ_TYPE.name() + EXPECTED_SUFFIX, type.shortName())
.setString(COL_OBJ_VERS.name() + EXPECTED_SUFFIX, expectedVersion)
.setString(COL_OBJ_VERS.name(), newVersion)
.setByteBuffer(COL_OBJ_VALUE.name(), ByteBuffer.wrap(serialized))
.setLong(COL_OBJ_REFERENCED.name(), referenced);
.setByteBuffer(COL_OBJ_VALUE.name(), ByteBuffer.wrap(serialized));
if (newValue.referenced() != -1L) {
// -1 is a sentinel for AbstractBasePersistTests.deleteWithReferenced()
stmt = stmt.setLong(COL_OBJ_REFERENCED.name(), referenced);
} else {
stmt = stmt.setToNull(COL_OBJ_REFERENCED.name());
}

return backend.executeCas(stmt.build());
}
Expand Down Expand Up @@ -538,10 +552,15 @@ private <R> R writeSingleObj(
.newBoundStatementBuilder(upsert ? UPSERT_OBJ : STORE_OBJ, upsert)
.setString(COL_REPO_ID.name(), config.repositoryId())
.setByteBuffer(COL_OBJ_ID.name(), serializeObjId(id))
.setLong(COL_OBJ_REFERENCED.name(), referenced)
.setString(COL_OBJ_TYPE.name(), type.shortName())
.setString(COL_OBJ_VERS.name(), versionToken)
.setByteBuffer(COL_OBJ_VALUE.name(), ByteBuffer.wrap(serialized));
if (obj.referenced() != -1L) {
// -1 is a sentinel for AbstractBasePersistTests.deleteWithReferenced()
stmt = stmt.setLong(COL_OBJ_REFERENCED.name(), referenced);
} else {
stmt = stmt.setToNull(COL_OBJ_REFERENCED.name());
}

return consumer.apply(stmt.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.assertj.core.api.InstanceOfAssertFactories.LONG;
import static org.assertj.core.api.InstanceOfAssertFactories.list;
import static org.assertj.core.api.InstanceOfAssertFactories.type;
import static org.junit.jupiter.params.provider.Arguments.arguments;
Expand Down Expand Up @@ -140,6 +141,46 @@ public class AbstractBasePersistTests {

@NessiePersist protected Persist persist;

@SuppressWarnings("DataFlowIssue")
@Test
public void deleteWithReferenced() throws Exception {
assumeThat(persist.isCaching()).isFalse();

// Do NOT use any batch store operation here - implementations are only adopted to "respect" the
// test-sentinel value -1 for exactly this .storeObj() signature!

var objWithReferenced = SimpleTestObj.builder().id(randomObjId()).text("foo").build();
persist.storeObj(objWithReferenced, true);
var readWithReferenced = persist.fetchObj(objWithReferenced.id());
soft.assertThat(readWithReferenced).extracting(Obj::referenced, LONG).isGreaterThan(0);
soft.assertThat(persist.deleteWithReferenced(objWithReferenced)).isFalse();
soft.assertThatCode(() -> persist.fetchObj(objWithReferenced.id())).doesNotThrowAnyException();
soft.assertThat(persist.deleteWithReferenced(objWithReferenced.withReferenced(Long.MAX_VALUE)))
.isFalse();
soft.assertThatCode(() -> persist.fetchObj(objWithReferenced.id())).doesNotThrowAnyException();
soft.assertThat(persist.deleteWithReferenced(readWithReferenced)).isTrue();
soft.assertThatCode(() -> persist.fetchObj(objWithReferenced.id()))
.isInstanceOf(ObjNotFoundException.class);

var objWithoutReferenced1 =
SimpleTestObj.builder().referenced(-1L).id(randomObjId()).text("foo").build();
persist.storeObj(objWithoutReferenced1, true);
var readWithoutReferenced1 = persist.fetchObj(objWithoutReferenced1.id());
soft.assertThat(readWithoutReferenced1).extracting(Obj::referenced, LONG).isEqualTo(-1L);
soft.assertThat(persist.deleteWithReferenced(objWithoutReferenced1)).isTrue();
soft.assertThatCode(() -> persist.fetchObj(objWithoutReferenced1.id()))
.isInstanceOf(ObjNotFoundException.class);

var objWithoutReferenced2 =
SimpleTestObj.builder().referenced(-1L).id(randomObjId()).text("foo").build();
persist.storeObj(objWithoutReferenced2, true);
var readWithoutReferenced2 = persist.fetchObj(objWithoutReferenced2.id());
soft.assertThat(readWithoutReferenced2).extracting(Obj::referenced, LONG).isEqualTo(-1L);
soft.assertThat(persist.deleteWithReferenced(objWithoutReferenced2)).isTrue();
soft.assertThatCode(() -> persist.fetchObj(objWithoutReferenced2.id()))
.isInstanceOf(ObjNotFoundException.class);
}

@ParameterizedTest
@MethodSource
public void genericObj(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ public interface Obj {
* <p>The value of this attribute is generated exclusively by the {@link Persist} implementations.
*
* <p>This attribute is <em>not</em> consistent when using a caching {@link Persist}.
*
* <p>When <em>reading</em> an object, this value is either {@code 0}, which means that the object
* was written using a Nessie version that did not have this attribute, or a (positive) timestamp
* when the object was written.
*
* <p>When <em>storing</em> an object, this value <em>must</em> be {@code 0}. The only one
* exception is for tests that exercise the relevant code paths - those tests do also use {@code
* -1} as a sentinel to write "NULL".
*
* <p>In any case it is <em>illegal</em> to refer to and/or interpret this attribute from code
* that does not have to deal explicitly with this value, only code that runs maintenance
* operations shall use this value.
*/
@JsonIgnore
@JacksonInject(OBJ_REFERENCED_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Condition;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
Expand Down Expand Up @@ -549,16 +550,21 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
public boolean deleteWithReferenced(@Nonnull Obj obj) {
ObjId id = obj.id();

Map<String, ExpectedAttributeValue> expectedValues =
Map.of(
COL_OBJ_REFERENCED,
ExpectedAttributeValue.builder().value(fromS(Long.toString(obj.referenced()))).build());
var deleteItemRequest =
DeleteItemRequest.builder().tableName(backend.tableObjs).key(objKeyMap(id));
if (obj.referenced() != -1L) {
// We take a risk here in case the given object does _not_ have a referenced() value
// (old object). It's not possible in DynamoDB to check for '== 0 OR IS ABSENT/NULL'.
deleteItemRequest.expected(
Map.of(
COL_OBJ_REFERENCED,
ExpectedAttributeValue.builder()
.value(fromS(Long.toString(obj.referenced())))
.build()));
}

try {
backend
.client()
.deleteItem(
b -> b.tableName(backend.tableObjs).key(objKeyMap(id)).expected(expectedValues));
backend.client().deleteItem(deleteItemRequest.build());
return true;
} catch (ConditionalCheckFailedException checkFailedException) {
return false;
Expand Down Expand Up @@ -663,7 +669,7 @@ private <T extends Obj> T itemToObj(
Map<String, AttributeValue> inner = item.get(serializer.attributeName()).m();
String versionToken = attributeToString(item, COL_OBJ_VERS);
String referencedString = attributeToString(item, COL_OBJ_REFERENCED);
long referenced = referencedString != null ? Long.parseLong(referencedString) : 0L;
long referenced = referencedString != null ? Long.parseLong(referencedString) : -1L;
@SuppressWarnings("unchecked")
T typed = (T) serializer.fromMap(id, type, referenced, inner, versionToken);
return typed;
Expand All @@ -680,7 +686,10 @@ private Map<String, AttributeValue> objToItem(
Map<String, AttributeValue> inner = new HashMap<>();
item.put(KEY_NAME, objKey(id));
item.put(COL_OBJ_TYPE, fromS(type.shortName()));
item.put(COL_OBJ_REFERENCED, fromS(Long.toString(referenced)));
if (obj.referenced() != -1) {
// -1 is a sentinel for AbstractBasePersistTests.deleteWithReferenced()
item.put(COL_OBJ_REFERENCED, fromS(Long.toString(referenced)));
}
UpdateableObj.extractVersionToken(obj).ifPresent(token -> item.put(COL_OBJ_VERS, fromS(token)));
int incrementalIndexSizeLimit =
ignoreSoftSizeRestrictions ? Integer.MAX_VALUE : effectiveIncrementalIndexSizeLimit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Condition;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
Expand Down Expand Up @@ -550,16 +551,21 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
public boolean deleteWithReferenced(@Nonnull Obj obj) {
ObjId id = obj.id();

Map<String, ExpectedAttributeValue> expectedValues =
Map.of(
COL_OBJ_REFERENCED,
ExpectedAttributeValue.builder().value(fromS(Long.toString(obj.referenced()))).build());
var deleteItemRequest =
DeleteItemRequest.builder().tableName(backend.tableObjs).key(objKeyMap(id));
if (obj.referenced() != -1L) {
// We take a risk here in case the given object does _not_ have a referenced() value
// (old object). It's not possible in DynamoDB to check for '== 0 OR IS ABSENT/NULL'.
deleteItemRequest.expected(
Map.of(
COL_OBJ_REFERENCED,
ExpectedAttributeValue.builder()
.value(fromS(Long.toString(obj.referenced())))
.build()));
}

try {
backend
.client()
.deleteItem(
b -> b.tableName(backend.tableObjs).key(objKeyMap(id)).expected(expectedValues));
backend.client().deleteItem(deleteItemRequest.build());
return true;
} catch (ConditionalCheckFailedException checkFailedException) {
return false;
Expand Down Expand Up @@ -663,7 +669,7 @@ private <T extends Obj> T itemToObj(
ByteBuffer bin = item.get(COL_OBJ_VALUE).b().asByteBuffer();
String versionToken = attributeToString(item, COL_OBJ_VERS);
String referencedString = attributeToString(item, COL_OBJ_REFERENCED);
long referenced = referencedString != null ? Long.parseLong(referencedString) : 0L;
long referenced = referencedString != null ? Long.parseLong(referencedString) : -1L;
Obj obj = deserializeObj(id, referenced, bin, versionToken);
return typeClass.cast(obj);
}
Expand All @@ -677,7 +683,10 @@ private Map<String, AttributeValue> objToItem(
Map<String, AttributeValue> item = new HashMap<>();
item.put(KEY_NAME, objKey(id));
item.put(COL_OBJ_TYPE, fromS(type.shortName()));
item.put(COL_OBJ_REFERENCED, fromS(Long.toString(referenced)));
if (obj.referenced() != -1) {
// -1 is a sentinel for AbstractBasePersistTests.deleteWithReferenced()
item.put(COL_OBJ_REFERENCED, fromS(Long.toString(referenced)));
}
UpdateableObj.extractVersionToken(obj).ifPresent(token -> item.put(COL_OBJ_VERS, fromS(token)));
int incrementalIndexSizeLimit =
ignoreSoftSizeRestrictions ? Integer.MAX_VALUE : effectiveIncrementalIndexSizeLimit();
Expand Down
Loading

0 comments on commit e129315

Please sign in to comment.