Skip to content

Commit c33f655

Browse files
authored
use writer UUID + sequence number as a request ID for transactional writes (#94)
* use writer UUID + sequence number as a request ID for transactional writes * limit token to 36 characters * Pack UUID bits and seqNr into base64 string
1 parent b221b5c commit c33f655

File tree

1 file changed

+18
-0
lines changed

1 file changed

+18
-0
lines changed

core/src/main/scala/akka/persistence/dynamodb/internal/JournalDao.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44

55
package akka.persistence.dynamodb.internal
66

7+
import java.nio.ByteBuffer
78
import java.time.Instant
89
import java.util.concurrent.CompletionException
10+
import java.util.Base64
911
import java.util.{ HashMap => JHashMap }
12+
import java.util.UUID
1013

1114
import scala.concurrent.ExecutionContext
1215
import scala.concurrent.Future
@@ -39,6 +42,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
3942
@InternalApi private[akka] object JournalDao {
4043
private val log: Logger = LoggerFactory.getLogger(classOf[JournalDao])
4144

45+
private val base64Encoder = Base64.getEncoder
4246
}
4347

4448
/**
@@ -124,8 +128,22 @@ import software.amazon.awssdk.services.dynamodb.model.Update
124128
.build()
125129
}.asJava
126130

131+
val token = {
132+
val firstEvent = events.head
133+
val uuid = UUID.fromString(firstEvent.writerUuid)
134+
val seqNr = firstEvent.seqNr
135+
val bb = ByteBuffer.allocate(24)
136+
bb.asLongBuffer()
137+
.put(uuid.getMostSignificantBits)
138+
.put(uuid.getLeastSignificantBits)
139+
.put(seqNr)
140+
141+
new String(base64Encoder.encode(bb.array))
142+
}
143+
127144
val req = TransactWriteItemsRequest
128145
.builder()
146+
.clientRequestToken(token)
129147
.transactItems(writeItems)
130148
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
131149
.build()

0 commit comments

Comments
 (0)