Skip to content

Commit f1342e8

Browse files
authored
bump: Akka core 2.10.1, r2dbc 3.1.2 (#1305)
* include slice in log * ignore scala-lang updates * akka-pki dependency, due to mixed versions
1 parent 6ec48a0 commit f1342e8

File tree

28 files changed

+103
-68
lines changed

28 files changed

+103
-68
lines changed

Diff for: .scala-steward.conf

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ updates.ignore = [
1212

1313
{groupId = "com.fasterxml.jackson.core" }
1414
{ groupId = "ch.qos.logback", artifactId = "logback-classic", version = "1.5." }
15+
16+
{ groupId = "org.scala-lang" }
1517
]
1618

1719
updatePullRequests = false

Diff for: akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetProjectionSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ class DynamoDBTimestampOffsetProjectionSpec
567567
env.sequenceNr,
568568
eventOption = None,
569569
env.timestamp,
570-
env.eventMetadata,
570+
env.internalEventMetadata,
571571
env.entityType,
572572
env.slice,
573573
env.filtered,
@@ -635,7 +635,7 @@ class DynamoDBTimestampOffsetProjectionSpec
635635
env.sequenceNr,
636636
env.eventOption,
637637
env.timestamp,
638-
env.eventMetadata,
638+
env.internalEventMetadata,
639639
env.entityType,
640640
env.slice,
641641
filtered = true,

Diff for: akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
131131
env.sequenceNr,
132132
eventOption = None,
133133
env.timestamp,
134-
env.eventMetadata,
134+
env.internalEventMetadata,
135135
env.entityType,
136136
env.slice,
137137
env.filtered,
@@ -144,7 +144,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
144144
env.sequenceNr,
145145
env.eventOption,
146146
env.timestamp,
147-
env.eventMetadata,
147+
env.internalEventMetadata,
148148
env.entityType,
149149
env.slice,
150150
filtered = true,

Diff for: akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
122122
.loadEnvelope[String](pid.id, sequenceNr = 1L)
123123
.futureValue
124124
env.filtered shouldBe true
125-
env.eventMetadata shouldBe None
125+
env.internalEventMetadata shouldBe None
126126
env.eventOption.isEmpty shouldBe true
127127
}
128128

Diff for: akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ class EventProducerServiceSpec
257257
EventProducerSource(entityType7, streamId7, transformation, settings)
258258
.withReplicatedEventMetadataTransformation(
259259
env =>
260-
if (env.eventMetadata.isDefined) None
260+
if (env.metadata[ReplicatedEventMetadata].isDefined) None
261261
else {
262262
// migrated from non-replicated, fill in metadata
263263
Some(

Diff for: akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/producer/scaladsl/TransformationSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {
4747
"transform low level with metadata" in {
4848
val transformer =
4949
Transformation.empty.registerAsyncEnvelopeMapper((env: EventEnvelope[String]) =>
50-
Future.successful(env.eventMetadata))
50+
Future.successful(env.metadata[String]))
5151
transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta"))
5252
}
5353

@@ -63,7 +63,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {
6363

6464
"fallback low level with metadata if no transformer exist for event" in {
6565
val transformer = Transformation.empty.registerAsyncEnvelopeOrElseMapper((env: EventEnvelope[Any]) =>
66-
Future.successful(env.eventMetadata))
66+
Future.successful(env.metadata[String]))
6767
transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta"))
6868
}
6969

Diff for: akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala

+7-6
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ private[akka] object EventPusherConsumerServiceImpl {
7878
log.trace("Ignoring filtered event [{}] for pid [{}]", envelope.sequenceNr, envelope.persistenceId)
7979
Future.successful(Done)
8080
} else {
81-
envelope.eventMetadata match {
82-
case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
81+
envelope.metadata[ReplicatedEventMetadata] match {
82+
case Some(replicatedEventMetadata) =>
8383
// send event to entity in this replica
8484
val replicationId = ReplicationId.fromString(envelope.persistenceId)
8585
val destinationReplicaId = replicationId.withReplica(replicationSettings.selfReplicaId)
@@ -104,7 +104,8 @@ private[akka] object EventPusherConsumerServiceImpl {
104104
Some(
105105
new ReplicatedPublishedEventMetaData(
106106
replicatedEventMetadata.originReplica,
107-
replicatedEventMetadata.version)),
107+
replicatedEventMetadata.version,
108+
envelope.internalEventMetadata)),
108109
Some(replyTo)))
109110
}
110111

@@ -123,9 +124,9 @@ private[akka] object EventPusherConsumerServiceImpl {
123124
error))
124125
askResult
125126

126-
case unexpected =>
127+
case None =>
127128
throw new IllegalArgumentException(
128-
s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
129+
s"Missing replication metadata (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
129130
", is the remote entity really a Replicated Event Sourced Entity?")
130131
}
131132
}
@@ -141,7 +142,7 @@ private[akka] object EventPusherConsumerServiceImpl {
141142
event = envelope.eventOption.getOrElse(FilteredPayload),
142143
isSnapshotEvent = fromSnapshot(envelope),
143144
fillSequenceNumberGaps = fillSequenceNumberGaps,
144-
metadata = envelope.eventMetadata,
145+
metadata = envelope.internalEventMetadata,
145146
tags = envelope.tags,
146147
replyTo = replyTo))(d.settings.journalWriteTimeout, system.scheduler)
147148
}

Diff for: akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ import scala.concurrent.ExecutionContext
426426
val pid = env.persistenceId
427427

428428
// replicaId is used for validation of replay requests, to avoid replay for other replicas
429-
if (replicaId.isEmpty && env.eventMetadata.exists(_.isInstanceOf[ReplicatedEventMetadata]))
429+
if (replicaId.isEmpty && env.metadata[ReplicatedEventMetadata].isDefined)
430430
replicaId = Some(ReplicationId.fromString(pid).replicaId)
431431

432432
if (producerFilter(env) && filter.matches(env)) {

Diff for: akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private[akka] object ProtobufProtocolConversions {
113113

114114
def toEvent(transformedEvent: Any): Event = {
115115
val protoEvent = protoAnySerialization.serialize(transformedEvent)
116-
val metadata = env.eventMetadata.map(protoAnySerialization.serialize)
116+
val metadata = env.internalEventMetadata.map(protoAnySerialization.serialize)
117117
Event(
118118
persistenceId = env.persistenceId,
119119
seqNr = env.sequenceNr,
@@ -169,7 +169,7 @@ private[akka] object ProtobufProtocolConversions {
169169
event.seqNr,
170170
evt,
171171
eventOffset.timestamp.toEpochMilli,
172-
eventMetadata = metadata,
172+
_eventMetadata = metadata,
173173
PersistenceId.extractEntityType(event.persistenceId),
174174
event.slice,
175175
filtered = false,
@@ -188,7 +188,7 @@ private[akka] object ProtobufProtocolConversions {
188188
event.seqNr,
189189
eventOption = Some(serializedEvent),
190190
eventOffset.timestamp.toEpochMilli,
191-
eventMetadata = metadata,
191+
_eventMetadata = metadata,
192192
PersistenceId.extractEntityType(event.persistenceId),
193193
event.slice,
194194
filtered = false,
@@ -213,7 +213,7 @@ private[akka] object ProtobufProtocolConversions {
213213
filtered.seqNr,
214214
None,
215215
eventOffset.timestamp.toEpochMilli,
216-
eventMetadata = None,
216+
_eventMetadata = None,
217217
PersistenceId.extractEntityType(filtered.persistenceId),
218218
filtered.slice,
219219
filtered = true,

Diff for: akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ import akka.projection.grpc.internal.proto.ReplicaInfo
3636
if (envelope.eventOption.isEmpty)
3737
true
3838
else
39-
envelope.eventMetadata match {
40-
case Some(meta: ReplicatedEventMetadata) =>
39+
envelope.metadata[ReplicatedEventMetadata] match {
40+
case Some(meta) =>
4141
!exclude(meta.originReplica)
42-
case _ =>
42+
case None =>
4343
throw new IllegalArgumentException(
4444
s"Got an event without replication metadata, not supported (pid: ${envelope.persistenceId}, seq_nr: ${envelope.sequenceNr})")
4545
}

Diff for: akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala

+10-8
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private[akka] object ReplicationImpl {
103103
settings.producerFilter)
104104
.withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId))
105105
.withReplicatedEventMetadataTransformation(env =>
106-
if (env.eventMetadata.isDefined) None
106+
if (env.metadata[ReplicatedEventMetadata].isDefined) None
107107
else {
108108
// migrated from non-replicated, fill in metadata
109109
Some(
@@ -218,8 +218,8 @@ private[akka] object ReplicationImpl {
218218
envelope.persistenceId) {
219219
case (envelope, _) =>
220220
if (!envelope.filtered) {
221-
envelope.eventMetadata match {
222-
case Some(replicatedEventMetadata: ReplicatedEventMetadata)
221+
envelope.metadata[ReplicatedEventMetadata] match {
222+
case Some(replicatedEventMetadata)
223223
if replicatedEventMetadata.originReplica == settings.selfReplicaId =>
224224
// skipping events originating from self replica (break cycle)
225225
if (log.isTraceEnabled)
@@ -231,7 +231,7 @@ private[akka] object ReplicationImpl {
231231
envelope.sequenceNr)
232232
Future.successful(Done)
233233

234-
case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
234+
case Some(replicatedEventMetadata) =>
235235
val replicationId = ReplicationId.fromString(envelope.persistenceId)
236236
val destinationReplicaId = replicationId.withReplica(settings.selfReplicaId)
237237
val entityRef =
@@ -251,9 +251,11 @@ private[akka] object ReplicationImpl {
251251
replicatedEventMetadata.originSequenceNr,
252252
envelope.event,
253253
envelope.timestamp,
254-
Some(new ReplicatedPublishedEventMetaData(
255-
replicatedEventMetadata.originReplica,
256-
replicatedEventMetadata.version)),
254+
Some(
255+
new ReplicatedPublishedEventMetaData(
256+
replicatedEventMetadata.originReplica,
257+
replicatedEventMetadata.version,
258+
envelope.internalEventMetadata)),
257259
Some(replyTo)))
258260
askResult.failed.foreach(error =>
259261
log.warn(
@@ -360,7 +362,7 @@ private[akka] object ReplicationImpl {
360362
settings.eventProducerSettings.withAkkaSerializationOnly())
361363
.withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId))
362364
.withReplicatedEventMetadataTransformation(env =>
363-
if (env.eventMetadata.isDefined) None
365+
if (env.metadata[ReplicatedEventMetadata].isDefined) None
364366
else {
365367
// migrated from non-replicated, fill in metadata
366368
Some(

Diff for: akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ class R2dbcTimestampOffsetProjectionSpec
395395
env.sequenceNr,
396396
eventOption = None,
397397
env.timestamp,
398-
env.eventMetadata,
398+
env.internalEventMetadata,
399399
env.entityType,
400400
env.slice,
401401
env.filtered,
@@ -484,7 +484,7 @@ class R2dbcTimestampOffsetProjectionSpec
484484
env.sequenceNr,
485485
env.eventOption,
486486
env.timestamp,
487-
env.eventMetadata,
487+
env.internalEventMetadata,
488488
env.entityType,
489489
env.slice,
490490
filtered = true,

Diff for: akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class R2dbcTimestampOffsetStoreSpec
9696
env.sequenceNr,
9797
eventOption = None,
9898
env.timestamp,
99-
env.eventMetadata,
99+
env.internalEventMetadata,
100100
env.entityType,
101101
env.slice,
102102
env.filtered,
@@ -109,7 +109,7 @@ class R2dbcTimestampOffsetStoreSpec
109109
env.sequenceNr,
110110
env.eventOption,
111111
env.timestamp,
112-
env.eventMetadata,
112+
env.internalEventMetadata,
113113
env.entityType,
114114
env.slice,
115115
filtered = true,

Diff for: akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ private[projection] class PostgresOffsetStoreDao(
196196
slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = {
197197
r2dbcExecutor.select("read timestamp offset")(
198198
conn => {
199-
logger.trace("reading timestamp offset for [{}]", projectionId)
199+
logger.trace("reading timestamp offset slice [{}] for [{}]", slice, projectionId)
200200
conn
201201
.createStatement(selectTimestampOffsetSql)
202202
.bind(0, slice)

Diff for: project/Dependencies.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ object Dependencies {
1616
val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3
1717

1818
object Versions {
19-
val Akka = sys.props.getOrElse("build.akka.version", "2.10.0")
19+
val Akka = sys.props.getOrElse("build.akka.version", "2.10.1")
2020
val AkkaVersionInDocs = VersionNumber(Akka).numbers match { case Seq(major, minor, _*) => s"$major.$minor" }
2121

2222
val Alpakka = "9.0.0"
@@ -35,7 +35,7 @@ object Dependencies {
3535
val AkkaPersistenceCassandra = "1.3.0"
3636
val AkkaPersistenceJdbc = "5.5.0"
3737

38-
val AkkaPersistenceR2dbc = "1.3.1"
38+
val AkkaPersistenceR2dbc = "1.3.2"
3939
val AkkaPersistenceR2dbcVersionInDocs = VersionNumber(AkkaPersistenceR2dbc).numbers match {
4040
case Seq(major, minor, _*) => s"$major.$minor"
4141
}

Diff for: samples/grpc/iot-service-java/pom.xml

+7-2
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
<properties>
1919
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
20-
<akka.version>2.10.0</akka.version>
20+
<akka.version>2.10.1</akka.version>
2121
<akka-projection.version>1.6.7</akka-projection.version>
22-
<akka-persistence-r2dbc.version>1.3.1</akka-persistence-r2dbc.version>
22+
<akka-persistence-r2dbc.version>1.3.2</akka-persistence-r2dbc.version>
2323
<akka-management.version>1.6.0</akka-management.version>
2424
<akka-diagnostics.version>2.2.0</akka-diagnostics.version>
2525
<akka-grpc.version>2.5.0</akka-grpc.version>
@@ -85,6 +85,11 @@
8585
<artifactId>akka-discovery_${scala.binary.version}</artifactId>
8686
<version>${akka.version}</version>
8787
</dependency>
88+
<dependency>
89+
<groupId>com.typesafe.akka</groupId>
90+
<artifactId>akka-pki_${scala.binary.version}</artifactId>
91+
<version>${akka.version}</version>
92+
</dependency>
8893
<dependency>
8994
<groupId>com.lightbend.akka</groupId>
9095
<artifactId>akka-persistence-r2dbc_${scala.binary.version}</artifactId>

Diff for: samples/grpc/iot-service-scala/build.sbt

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")
66

77
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
88

9-
scalaVersion := "2.13.15"
9+
scalaVersion := "2.13.16"
1010

1111
Compile / scalacOptions ++= Seq(
1212
"-release:11",
@@ -28,10 +28,10 @@ run / javaOptions ++= sys.props
2828
.fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res"))
2929
Global / cancelable := false // ctrl-c
3030

31-
val AkkaVersion = "2.10.0"
31+
val AkkaVersion = "2.10.1"
3232
val AkkaHttpVersion = "10.7.0"
3333
val AkkaManagementVersion = "1.6.0"
34-
val AkkaPersistenceR2dbcVersion = "1.3.1"
34+
val AkkaPersistenceR2dbcVersion = "1.3.2"
3535
val AkkaProjectionVersion =
3636
sys.props.getOrElse("akka-projection.version", "1.6.7")
3737
val AkkaDiagnosticsVersion = "2.2.0"

Diff for: samples/grpc/local-drone-control-java/pom.xml

+7-2
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
<properties>
1919
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
20-
<akka.version>2.10.0</akka.version>
20+
<akka.version>2.10.1</akka.version>
2121
<akka-projection.version>1.6.7</akka-projection.version>
22-
<akka-persistence-r2dbc.version>1.3.1</akka-persistence-r2dbc.version>
22+
<akka-persistence-r2dbc.version>1.3.2</akka-persistence-r2dbc.version>
2323
<akka-management.version>1.6.0</akka-management.version>
2424
<akka-diagnostics.version>2.2.0</akka-diagnostics.version>
2525
<akka-http.version>10.7.0</akka-http.version>
@@ -90,6 +90,11 @@
9090
<artifactId>akka-discovery_${scala.binary.version}</artifactId>
9191
<version>${akka.version}</version>
9292
</dependency>
93+
<dependency>
94+
<groupId>com.typesafe.akka</groupId>
95+
<artifactId>akka-pki_${scala.binary.version}</artifactId>
96+
<version>${akka.version}</version>
97+
</dependency>
9398
<dependency>
9499
<groupId>com.lightbend.akka</groupId>
95100
<artifactId>akka-persistence-r2dbc_${scala.binary.version}</artifactId>

Diff for: samples/grpc/local-drone-control-scala/autoscaling/simulator/build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
scalaVersion := "2.13.15"
1+
scalaVersion := "2.13.16"
22

33
enablePlugins(GatlingPlugin)
44

Diff for: samples/grpc/local-drone-control-scala/build.sbt

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")
66

77
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
88

9-
scalaVersion := "2.13.15"
9+
scalaVersion := "2.13.16"
1010

1111
Compile / scalacOptions ++= Seq(
1212
"-release:11",
@@ -30,10 +30,10 @@ run / javaOptions ++= sys.props
3030
.fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res"))
3131
Global / cancelable := false // ctrl-c
3232

33-
val AkkaVersion = "2.10.0"
33+
val AkkaVersion = "2.10.1"
3434
val AkkaHttpVersion = "10.7.0"
3535
val AkkaManagementVersion = "1.6.0"
36-
val AkkaPersistenceR2dbcVersion = "1.3.1"
36+
val AkkaPersistenceR2dbcVersion = "1.3.2"
3737
val AkkaProjectionVersion =
3838
sys.props.getOrElse("akka-projection.version", "1.6.7")
3939
val AkkaDiagnosticsVersion = "2.2.0"

0 commit comments

Comments
 (0)