diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java index 66ae2bc2691e..e4ceec16fb7f 100644 --- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java +++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java @@ -51,10 +51,9 @@ public class UpdateParameters public final TableMetadata metadata; public final ClientState clientState; public final QueryOptions options; - public final boolean constructingAccordBaseUpdate; private final long nowInSec; - private final long timestamp; + protected final long timestamp; private final int ttl; private final DeletionTime deletionTime; @@ -75,18 +74,6 @@ public UpdateParameters(TableMetadata metadata, long nowInSec, int ttl, Map prefetchedRows) throws InvalidRequestException - { - this(metadata, clientState, options, timestamp, nowInSec, ttl, prefetchedRows, false); - } - - public UpdateParameters(TableMetadata metadata, - ClientState clientState, - QueryOptions options, - long timestamp, - long nowInSec, - int ttl, - Map prefetchedRows, - boolean constructingAccordBaseUpdate) throws InvalidRequestException { this.metadata = metadata; this.clientState = clientState; @@ -104,8 +91,6 @@ public UpdateParameters(TableMetadata metadata, // it to avoid potential confusion. if (timestamp == Long.MIN_VALUE) throw new InvalidRequestException(String.format("Out of bound timestamp, must be in [%d, %d]", Long.MIN_VALUE + 1, Long.MAX_VALUE)); - - this.constructingAccordBaseUpdate = constructingAccordBaseUpdate; } public void newRow(Clustering clustering) throws InvalidRequestException diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 0231ab16fd11..11340815178a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -924,8 +924,8 @@ final void addUpdates(UpdatesCollector collector, local, timestamp, nowInSeconds, - requestTime, - constructingAccordBaseUpdate); + requestTime + ); for (ByteBuffer key : keys) { Validation.validateKey(metadata(), key); @@ -945,7 +945,7 @@ final void addUpdates(UpdatesCollector collector, if (restrictions.hasClusteringColumnsRestrictions() && clusterings.isEmpty()) return; - UpdateParameters params = makeUpdateParameters(keys, clusterings, state, options, local, timestamp, nowInSeconds, requestTime, constructingAccordBaseUpdate); + UpdateParameters params = makeUpdateParameters(keys, clusterings, state, options, local, timestamp, nowInSeconds, requestTime); for (ByteBuffer key : keys) { @@ -982,8 +982,7 @@ private UpdateParameters makeUpdateParameters(Collection keys, boolean local, long timestamp, long nowInSeconds, - Dispatcher.RequestTime requestTime, - boolean constructingAccordBaseUpdate) + Dispatcher.RequestTime requestTime) { if (clusterings.contains(Clustering.STATIC_CLUSTERING)) return makeUpdateParameters(keys, @@ -994,8 +993,8 @@ private UpdateParameters makeUpdateParameters(Collection keys, local, timestamp, nowInSeconds, - requestTime, - constructingAccordBaseUpdate); + requestTime + ); return makeUpdateParameters(keys, new ClusteringIndexNamesFilter(clusterings, false), @@ -1005,8 +1004,8 @@ private UpdateParameters makeUpdateParameters(Collection keys, local, timestamp, nowInSeconds, - requestTime, - constructingAccordBaseUpdate); + requestTime + ); } private UpdateParameters makeUpdateParameters(Collection keys, @@ -1017,8 +1016,7 @@ private UpdateParameters makeUpdateParameters(Collection keys, boolean local, long timestamp, long nowInSeconds, - Dispatcher.RequestTime requestTime, - boolean constructingAccordBaseUpdate) + Dispatcher.RequestTime requestTime) { // Some lists operation requires reading Map lists = @@ -1036,8 +1034,7 @@ private UpdateParameters makeUpdateParameters(Collection keys, getTimestamp(timestamp, options), nowInSeconds, getTimeToLive(options), - lists, - constructingAccordBaseUpdate); + lists); } public static abstract class Parsed extends QualifiedStatement diff --git a/src/java/org/apache/cassandra/cql3/terms/Lists.java b/src/java/org/apache/cassandra/cql3/terms/Lists.java index a0889e75ded0..9edd1e3f0bfd 100644 --- a/src/java/org/apache/cassandra/cql3/terms/Lists.java +++ b/src/java/org/apache/cassandra/cql3/terms/Lists.java @@ -44,7 +44,6 @@ import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.MultiElementType; -import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.db.rows.ComplexColumnData; @@ -52,7 +51,6 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.TimeUUID; import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; @@ -68,13 +66,6 @@ public abstract class Lists @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(Lists.class); - /** - * Sentinel value indicating the cell path should be replaced by Accord with one based on the transaction executeAt - */ - private static final TimeUUID ACCORD_CELL_PATH_SENTINEL_UUID = TimeUUID.atUnixMicrosWithLsb(0, 0); - public static final CellPath ACCORD_DUMMY_CELL_PATH = CellPath.create(ACCORD_CELL_PATH_SENTINEL_UUID.toBytes()); - private static final long ACCORD_CELL_PATH_SENTINEL_MSB = ACCORD_CELL_PATH_SENTINEL_UUID.msb(); - private Lists() {} public static ColumnSpecification indexSpecOf(ColumnSpecification column) @@ -160,33 +151,6 @@ public static ListType getPreferredCompatibleType(List items, return type == null ? null : ListType.getInstance(type, false); } - /** - * Return a function that given a cell with an ACCORD_CELL_PATH_SENTINEL_MSB will - * return a new CellPath with a TimeUUID that increases monotonically every time it is called or - * the existing cell path if path does not contain ACCORD_CELL_PATH_SENTINEL_MSB. - * - * Only intended to work with list cell paths where list append needs a timestamp based on the executeAt - * of the Accord transaction appending the cell. - * @param timestampMicros executeAt timestamp to use as the MSB for generated cell paths - */ - public static com.google.common.base.Function accordListPathSupplier(long timestampMicros) - { - return new com.google.common.base.Function() - { - final long timeUuidMsb = TimeUUID.unixMicrosToMsb(timestampMicros); - long cellIndex = 0; - @Override - public CellPath apply(Cell cell) - { - CellPath path = cell.path(); - if (ACCORD_CELL_PATH_SENTINEL_MSB == path.get(0).getLong(0)) - return CellPath.create(ByteBuffer.wrap(TimeUUID.toBytes(timeUuidMsb, TimeUUIDType.signedBytesToNativeLong(cellIndex++)))); - else - return path; - } - }; - } - public static class Literal extends Term.Raw { private final List elements; @@ -463,17 +427,10 @@ static void doAppend(Term.Terminal value, ColumnMetadata column, UpdateParameter // during SSTable write. Guardrails.itemsPerCollection.guard(type.collectionSize(elements), column.name.toString(), false, params.clientState); - long cellIndex = 0; int dataSize = 0; for (ByteBuffer buffer : elements) { - ByteBuffer cellPath; - // Accord will need to replace this value later once it knows the executeAt timestamp - // so just put a TimeUUID with MSB sentinel for now - if (params.constructingAccordBaseUpdate) - cellPath = TimeUUID.atUnixMicrosWithLsb(0, cellIndex++).toBytes(); - else - cellPath = ByteBuffer.wrap(params.nextTimeUUIDAsBytes()); + ByteBuffer cellPath = ByteBuffer.wrap(params.nextTimeUUIDAsBytes()); Cell cell = params.addCell(column, CellPath.create(cellPath), buffer); dataSize += cell.dataSize(); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 57aab6653a63..4a60f21e2b79 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -900,20 +900,16 @@ public static EpochDiskState markClosed(Ranges ranges, long epoch, EpochDiskStat diskState = maybeUpdateMaxEpoch(diskState, epoch); String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + "SET closed = closed + ? WHERE epoch = ?"; - executeInternal(cql, - KeySerializers.rangesToBlobMap(ranges), epoch); + executeInternal(cql, KeySerializers.rangesToBlobMap(ranges), epoch); return diskState; } - // TODO (required): unused public static EpochDiskState markRetired(Ranges ranges, long epoch, EpochDiskState diskState) { diskState = maybeUpdateMaxEpoch(diskState, epoch); String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + "SET retired = retired + ? WHERE epoch = ?"; - executeInternal(cql, - KeySerializers.rangesToBlobMap(ranges), epoch); - flush(Topologies); + executeInternal(cql, KeySerializers.rangesToBlobMap(ranges), epoch); return diskState; } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java index 436849e29075..b65acb29bc64 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java @@ -208,7 +208,7 @@ private static void skip(DataInputPlus in, boolean nullable) throws IOException { if ((flags & 1) != 0) return; - flags >>= 1; + flags >>>= 1; } in.readUnsignedVInt(); in.readUnsignedVInt(); @@ -240,7 +240,7 @@ private static void serialize(Timestamp executeAt, DataOutputPlus out, boolean n out.writeUnsignedVInt(executeAt.epoch()); out.writeUnsignedVInt(executeAt.hlc()); out.writeUnsignedVInt32(executeAt.node.id); - if ((flags & HAS_UNIQUE_HLC) != 0) + if (executeAt.hasDistinctHlcAndUniqueHlc()) out.writeUnsignedVInt(executeAt.uniqueHlc() - executeAt.hlc()); } } @@ -267,7 +267,7 @@ private static long serializedSize(Timestamp executeAt, boolean nullable) size += TypeSizes.sizeofUnsignedVInt(executeAt.epoch()); size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc()); size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id); - if ((flags & HAS_UNIQUE_HLC) != 0) + if (executeAt.hasDistinctHlcAndUniqueHlc()) size += TypeSizes.sizeofUnsignedVInt(executeAt.uniqueHlc() - executeAt.hlc()); return size; } diff --git a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java index 24d3ef78aba4..1bf46889dd59 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java +++ b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java @@ -26,9 +26,13 @@ import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.UpdateParameters; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.partitions.Partition; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.paxos.Ballot; +import org.apache.cassandra.utils.TimeUUID; import static com.google.common.base.Preconditions.checkState; import static java.util.concurrent.TimeUnit.MICROSECONDS; @@ -46,6 +50,22 @@ public AccordUpdateParameters(TxnData data, QueryOptions options, long timestamp this.timestamp = timestamp; } + static class RowUpdateParameters extends UpdateParameters + { + private long timeUuidNanos; + + public RowUpdateParameters(TableMetadata metadata, ClientState clientState, QueryOptions options, long timestamp, long nowInSec, int ttl, Map prefetchedRows) throws InvalidRequestException + { + super(metadata, clientState, options, timestamp, nowInSec, ttl, prefetchedRows); + } + + @Override + public byte[] nextTimeUUIDAsBytes() + { + return TimeUUID.toBytes(Ballot.unixMicrosToMsb(timestamp), TimeUUIDType.signedBytesToNativeLong(timeUuidNanos++)); + } + } + public TxnData getData() { return data; @@ -60,13 +80,13 @@ public UpdateParameters updateParameters(TableMetadata metadata, DecoratedKey dk // TODO : How should Accord work with TTL? int ttl = metadata.params.defaultTimeToLive; - return new UpdateParameters(metadata, - disabledGuardrails, - options, - timestamp, - MICROSECONDS.toSeconds(timestamp), - ttl, - prefetchRow(metadata, dk, rowIndex)); + return new RowUpdateParameters(metadata, + disabledGuardrails, + options, + timestamp, + MICROSECONDS.toSeconds(timestamp), + ttl, + prefetchRow(metadata, dk, rowIndex)); } private Map prefetchRow(TableMetadata metadata, DecoratedKey dk, int index)