Skip to content

Commit

Permalink
Follow-up to CASSANDRA-20228:
Browse files Browse the repository at this point in the history
 - Fix AccordUpdateParameters to correctly supply List cell paths derived from applyAt
 - Fix ExecuteAtSerialize.serialiseNullable
 - Fix topologies flush regression caused by markRetired forcing a flush on every call

patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-20228
  • Loading branch information
belliottsmith committed Jan 27, 2025
1 parent 52d6b72 commit 5bbc4ae
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 89 deletions.
17 changes: 1 addition & 16 deletions src/java/org/apache/cassandra/cql3/UpdateParameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ public class UpdateParameters
public final TableMetadata metadata;
public final ClientState clientState;
public final QueryOptions options;
public final boolean constructingAccordBaseUpdate;

private final long nowInSec;
private final long timestamp;
protected final long timestamp;
private final int ttl;

private final DeletionTime deletionTime;
Expand All @@ -75,18 +74,6 @@ public UpdateParameters(TableMetadata metadata,
long nowInSec,
int ttl,
Map<DecoratedKey, Partition> prefetchedRows) throws InvalidRequestException
{
this(metadata, clientState, options, timestamp, nowInSec, ttl, prefetchedRows, false);
}

public UpdateParameters(TableMetadata metadata,
ClientState clientState,
QueryOptions options,
long timestamp,
long nowInSec,
int ttl,
Map<DecoratedKey, Partition> prefetchedRows,
boolean constructingAccordBaseUpdate) throws InvalidRequestException
{
this.metadata = metadata;
this.clientState = clientState;
Expand All @@ -104,8 +91,6 @@ public UpdateParameters(TableMetadata metadata,
// it to avoid potential confusion.
if (timestamp == Long.MIN_VALUE)
throw new InvalidRequestException(String.format("Out of bound timestamp, must be in [%d, %d]", Long.MIN_VALUE + 1, Long.MAX_VALUE));

this.constructingAccordBaseUpdate = constructingAccordBaseUpdate;
}

public <V> void newRow(Clustering<V> clustering) throws InvalidRequestException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,8 @@ final void addUpdates(UpdatesCollector collector,
local,
timestamp,
nowInSeconds,
requestTime,
constructingAccordBaseUpdate);
requestTime
);
for (ByteBuffer key : keys)
{
Validation.validateKey(metadata(), key);
Expand All @@ -945,7 +945,7 @@ final void addUpdates(UpdatesCollector collector,
if (restrictions.hasClusteringColumnsRestrictions() && clusterings.isEmpty())
return;

UpdateParameters params = makeUpdateParameters(keys, clusterings, state, options, local, timestamp, nowInSeconds, requestTime, constructingAccordBaseUpdate);
UpdateParameters params = makeUpdateParameters(keys, clusterings, state, options, local, timestamp, nowInSeconds, requestTime);

for (ByteBuffer key : keys)
{
Expand Down Expand Up @@ -982,8 +982,7 @@ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
boolean local,
long timestamp,
long nowInSeconds,
Dispatcher.RequestTime requestTime,
boolean constructingAccordBaseUpdate)
Dispatcher.RequestTime requestTime)
{
if (clusterings.contains(Clustering.STATIC_CLUSTERING))
return makeUpdateParameters(keys,
Expand All @@ -994,8 +993,8 @@ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
local,
timestamp,
nowInSeconds,
requestTime,
constructingAccordBaseUpdate);
requestTime
);

return makeUpdateParameters(keys,
new ClusteringIndexNamesFilter(clusterings, false),
Expand All @@ -1005,8 +1004,8 @@ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
local,
timestamp,
nowInSeconds,
requestTime,
constructingAccordBaseUpdate);
requestTime
);
}

private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
Expand All @@ -1017,8 +1016,7 @@ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
boolean local,
long timestamp,
long nowInSeconds,
Dispatcher.RequestTime requestTime,
boolean constructingAccordBaseUpdate)
Dispatcher.RequestTime requestTime)
{
// Some lists operation requires reading
Map<DecoratedKey, Partition> lists =
Expand All @@ -1036,8 +1034,7 @@ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
getTimestamp(timestamp, options),
nowInSeconds,
getTimeToLive(options),
lists,
constructingAccordBaseUpdate);
lists);
}

public static abstract class Parsed extends QualifiedStatement
Expand Down
45 changes: 1 addition & 44 deletions src/java/org/apache/cassandra/cql3/terms/Lists.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,13 @@
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MultiElementType;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.TimeUUID;

import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
Expand All @@ -68,13 +66,6 @@ public abstract class Lists
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(Lists.class);

/**
* Sentinel value indicating the cell path should be replaced by Accord with one based on the transaction executeAt
*/
private static final TimeUUID ACCORD_CELL_PATH_SENTINEL_UUID = TimeUUID.atUnixMicrosWithLsb(0, 0);
public static final CellPath ACCORD_DUMMY_CELL_PATH = CellPath.create(ACCORD_CELL_PATH_SENTINEL_UUID.toBytes());
private static final long ACCORD_CELL_PATH_SENTINEL_MSB = ACCORD_CELL_PATH_SENTINEL_UUID.msb();

private Lists() {}

public static ColumnSpecification indexSpecOf(ColumnSpecification column)
Expand Down Expand Up @@ -160,33 +151,6 @@ public static <T> ListType<?> getPreferredCompatibleType(List<T> items,
return type == null ? null : ListType.getInstance(type, false);
}

/**
* Return a function that given a cell with an ACCORD_CELL_PATH_SENTINEL_MSB will
* return a new CellPath with a TimeUUID that increases monotonically every time it is called or
* the existing cell path if path does not contain ACCORD_CELL_PATH_SENTINEL_MSB.
*
* Only intended to work with list cell paths where list append needs a timestamp based on the executeAt
* of the Accord transaction appending the cell.
* @param timestampMicros executeAt timestamp to use as the MSB for generated cell paths
*/
public static com.google.common.base.Function<Cell, CellPath> accordListPathSupplier(long timestampMicros)
{
return new com.google.common.base.Function<Cell, CellPath>()
{
final long timeUuidMsb = TimeUUID.unixMicrosToMsb(timestampMicros);
long cellIndex = 0;
@Override
public CellPath apply(Cell cell)
{
CellPath path = cell.path();
if (ACCORD_CELL_PATH_SENTINEL_MSB == path.get(0).getLong(0))
return CellPath.create(ByteBuffer.wrap(TimeUUID.toBytes(timeUuidMsb, TimeUUIDType.signedBytesToNativeLong(cellIndex++))));
else
return path;
}
};
}

public static class Literal extends Term.Raw
{
private final List<Term.Raw> elements;
Expand Down Expand Up @@ -463,17 +427,10 @@ static void doAppend(Term.Terminal value, ColumnMetadata column, UpdateParameter
// during SSTable write.
Guardrails.itemsPerCollection.guard(type.collectionSize(elements), column.name.toString(), false, params.clientState);

long cellIndex = 0;
int dataSize = 0;
for (ByteBuffer buffer : elements)
{
ByteBuffer cellPath;
// Accord will need to replace this value later once it knows the executeAt timestamp
// so just put a TimeUUID with MSB sentinel for now
if (params.constructingAccordBaseUpdate)
cellPath = TimeUUID.atUnixMicrosWithLsb(0, cellIndex++).toBytes();
else
cellPath = ByteBuffer.wrap(params.nextTimeUUIDAsBytes());
ByteBuffer cellPath = ByteBuffer.wrap(params.nextTimeUUIDAsBytes());
Cell<?> cell = params.addCell(column, CellPath.create(cellPath), buffer);
dataSize += cell.dataSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -900,20 +900,16 @@ public static EpochDiskState markClosed(Ranges ranges, long epoch, EpochDiskStat
diskState = maybeUpdateMaxEpoch(diskState, epoch);
String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' +
"SET closed = closed + ? WHERE epoch = ?";
executeInternal(cql,
KeySerializers.rangesToBlobMap(ranges), epoch);
executeInternal(cql, KeySerializers.rangesToBlobMap(ranges), epoch);
return diskState;
}

// TODO (required): unused
public static EpochDiskState markRetired(Ranges ranges, long epoch, EpochDiskState diskState)
{
diskState = maybeUpdateMaxEpoch(diskState, epoch);
String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' +
"SET retired = retired + ? WHERE epoch = ?";
executeInternal(cql,
KeySerializers.rangesToBlobMap(ranges), epoch);
flush(Topologies);
executeInternal(cql, KeySerializers.rangesToBlobMap(ranges), epoch);
return diskState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private static void skip(DataInputPlus in, boolean nullable) throws IOException
{
if ((flags & 1) != 0)
return;
flags >>= 1;
flags >>>= 1;
}
in.readUnsignedVInt();
in.readUnsignedVInt();
Expand Down Expand Up @@ -240,7 +240,7 @@ private static void serialize(Timestamp executeAt, DataOutputPlus out, boolean n
out.writeUnsignedVInt(executeAt.epoch());
out.writeUnsignedVInt(executeAt.hlc());
out.writeUnsignedVInt32(executeAt.node.id);
if ((flags & HAS_UNIQUE_HLC) != 0)
if (executeAt.hasDistinctHlcAndUniqueHlc())
out.writeUnsignedVInt(executeAt.uniqueHlc() - executeAt.hlc());
}
}
Expand All @@ -267,7 +267,7 @@ private static long serializedSize(Timestamp executeAt, boolean nullable)
size += TypeSizes.sizeofUnsignedVInt(executeAt.epoch());
size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc());
size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id);
if ((flags & HAS_UNIQUE_HLC) != 0)
if (executeAt.hasDistinctHlcAndUniqueHlc())
size += TypeSizes.sizeofUnsignedVInt(executeAt.uniqueHlc() - executeAt.hlc());
return size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.UpdateParameters;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.utils.TimeUUID;

import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
Expand All @@ -46,6 +50,22 @@ public AccordUpdateParameters(TxnData data, QueryOptions options, long timestamp
this.timestamp = timestamp;
}

static class RowUpdateParameters extends UpdateParameters
{
private long timeUuidNanos;

public RowUpdateParameters(TableMetadata metadata, ClientState clientState, QueryOptions options, long timestamp, long nowInSec, int ttl, Map<DecoratedKey, Partition> prefetchedRows) throws InvalidRequestException
{
super(metadata, clientState, options, timestamp, nowInSec, ttl, prefetchedRows);
}

@Override
public byte[] nextTimeUUIDAsBytes()
{
return TimeUUID.toBytes(Ballot.unixMicrosToMsb(timestamp), TimeUUIDType.signedBytesToNativeLong(timeUuidNanos++));
}
}

public TxnData getData()
{
return data;
Expand All @@ -60,13 +80,13 @@ public UpdateParameters updateParameters(TableMetadata metadata, DecoratedKey dk

// TODO : How should Accord work with TTL?
int ttl = metadata.params.defaultTimeToLive;
return new UpdateParameters(metadata,
disabledGuardrails,
options,
timestamp,
MICROSECONDS.toSeconds(timestamp),
ttl,
prefetchRow(metadata, dk, rowIndex));
return new RowUpdateParameters(metadata,
disabledGuardrails,
options,
timestamp,
MICROSECONDS.toSeconds(timestamp),
ttl,
prefetchRow(metadata, dk, rowIndex));
}

private Map<DecoratedKey, Partition> prefetchRow(TableMetadata metadata, DecoratedKey dk, int index)
Expand Down

0 comments on commit 5bbc4ae

Please sign in to comment.