diff --git a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala index 99079d3aeb..b659af89fa 100644 --- a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala +++ b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/PersistenceTestKitPlugin.scala @@ -83,6 +83,7 @@ class PersistenceTestKitPlugin(@unused cfg: Config, cfgPath: String) extends Asy .foreach { case (tag, (timestamp, highestSequenceNr)) => eventStream.publish(PersistenceTestKitPlugin.TagWrite(tag, timestamp, highestSequenceNr)) } + eventStream.publish(PersistenceTestKitPlugin.SliceWrite(aw.persistenceId, timestamp, aw.highestSequenceNr)) } result }))) @@ -130,6 +131,8 @@ object PersistenceTestKitPlugin { private[testkit] case class Write(persistenceId: String, toSequenceNr: Long) private[testkit] case class TagWrite(tag: String, timestamp: Long, highestSequenceNr: Long) + private[testkit] case class SliceWrite(persistenceId: String, timestamp: Long, highestSequenceNr: Long) + } /** diff --git a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/internal/EventsBySliceStage.scala b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/internal/EventsBySliceStage.scala new file mode 100644 index 0000000000..24dc79811e --- /dev/null +++ b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/internal/EventsBySliceStage.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.testkit.query.internal + +import org.apache.pekko +import pekko.actor.ActorRef +import pekko.annotation.InternalApi +import pekko.persistence.Persistence +import pekko.persistence.query.typed +import pekko.persistence.query.Sequence +import pekko.persistence.testkit.EventStorage +import pekko.persistence.testkit.PersistenceTestKitPlugin.SliceWrite +import pekko.persistence.typed.PersistenceId +import pekko.stream.stage.GraphStage +import pekko.stream.stage.GraphStageLogic +import pekko.stream.stage.GraphStageLogicWithLogging +import pekko.stream.stage.OutHandler +import pekko.stream.Attributes +import pekko.stream.Outlet +import pekko.stream.SourceShape + +/** + * INTERNAL API + */ +@InternalApi +private[pekko] object EventsBySliceStage { + // PersistenceTestKitPlugin increments timestamp for each atomic write, + // which can only contain a single persistence ID, + // so we only need to track timestamp and sequence number within state, + // because same timestamp will not have multiple persistence IDs. + case class State( + currentTimestamp: Long, + lastSequenceNr: Long + ) { + def isAfter(timestamp: Long, sequenceNr: Long): Boolean = { + timestamp > currentTimestamp || (timestamp == currentTimestamp && sequenceNr > lastSequenceNr) + } + } +} + +/** + * INTERNAL API + */ +@InternalApi +final private[pekko] class EventsBySliceStage[Event]( + entityType: String, + minSlice: Int, + maxSlice: Int, + storage: EventStorage, + persistence: Persistence +) extends GraphStage[SourceShape[typed.EventEnvelope[Event]]] { + import EventsBySliceStage._ + + val out: Outlet[typed.EventEnvelope[Event]] = Outlet("EventsByTagSource") + override def shape: SourceShape[typed.EventEnvelope[Event]] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + new GraphStageLogicWithLogging(shape) with OutHandler { + private var state = Option.empty[State] + private var stageActorRef: ActorRef = null + override def preStart(): Unit = { + stageActorRef = getStageActor(receiveNotifications).ref + materializer.system.eventStream.subscribe(stageActorRef, classOf[SliceWrite]) + } + + private def shouldFilter(persistenceId: String): Boolean = { + val slice = persistence.sliceForPersistenceId(persistenceId) + PersistenceId.extractEntityType(persistenceId) == entityType && slice >= minSlice && slice <= maxSlice + } + + private def receiveNotifications(in: (ActorRef, Any)): Unit = { + val (_, msg) = in + (msg, state) match { + case (SliceWrite(persistenceId, timestamp, highestSequenceNr), maybeState) + if shouldFilter(persistenceId) && maybeState.forall(_.isAfter(timestamp, highestSequenceNr)) => + tryPush() + case _ => + } + } + + private def tryPush(): Unit = { + if (isAvailable(out)) { + val maybeNextEvent = storage.tryRead(entityType, repr => shouldFilter(repr.persistenceId)) + .sortBy(pr => (pr.timestamp, pr.sequenceNr)) + .find { pr => + state.forall(_.isAfter(pr.timestamp, pr.sequenceNr)) + } + + log.debug("tryPush available. State {} event {}", state, maybeNextEvent) + + maybeNextEvent.foreach { pr => + val slice = persistence.sliceForPersistenceId(pr.persistenceId) + push(out, + new typed.EventEnvelope[Event](Sequence(pr.sequenceNr), pr.persistenceId, pr.sequenceNr, + Some(pr.payload.asInstanceOf[Event]), pr.timestamp, pr.metadata, entityType, slice)) + + state = Some(State(pr.timestamp, pr.sequenceNr)) + } + } else { + log.debug("tryPush, no demand") + } + } + + override def onPull(): Unit = { + tryPush() + } + + setHandler(out, this) + } + + } + +} diff --git a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala index 343a93ea8c..b004e84477 100644 --- a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala +++ b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala @@ -36,9 +36,11 @@ import org.slf4j.LoggerFactory import pekko.persistence.Persistence import pekko.persistence.query.typed import pekko.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery +import pekko.persistence.query.typed.scaladsl.EventsBySliceQuery import pekko.persistence.typed.PersistenceId import pekko.persistence.query.scaladsl.EventsByTagQuery import pekko.persistence.testkit.query.internal.EventsByTagStage +import pekko.persistence.testkit.query.internal.EventsBySliceStage import scala.collection.immutable @@ -53,7 +55,8 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c with CurrentEventsByTagQuery with CurrentEventsBySliceQuery with PagedPersistenceIdsQuery - with EventsByTagQuery { + with EventsByTagQuery + with EventsBySliceQuery { private val log = LoggerFactory.getLogger(getClass) @@ -167,4 +170,16 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c } Source.fromGraph(new EventsByTagStage(tag, storage)) } + + override def eventsBySlices[Event]( + entityType: String, + minSlice: Int, + maxSlice: Int, + offset: Offset + ): Source[typed.EventEnvelope[Event], NotUsed] = { + if (offset != NoOffset) { + throw new UnsupportedOperationException("Offsets not supported for persistence test kit eventsBySlices yet") + } + Source.fromGraph(new EventsBySliceStage(entityType, minSlice, maxSlice, storage, persistence)) + } } diff --git a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsBySliceSpec.scala b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsBySliceSpec.scala new file mode 100644 index 0000000000..3affd9af4b --- /dev/null +++ b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsBySliceSpec.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.testkit.query + +import scala.collection.immutable.Seq +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import org.apache.pekko +import pekko.persistence.Persistence +import pekko.persistence.query.NoOffset +import pekko.persistence.testkit.internal.InMemStorageExtension +import pekko.Done +import pekko.actor.testkit.typed.scaladsl.LogCapturing +import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import pekko.actor.typed.ActorRef +import pekko.persistence.query.typed.EventEnvelope +import pekko.persistence.query.PersistenceQuery +import pekko.persistence.testkit.PersistenceTestKitPlugin +import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import pekko.persistence.typed.PersistenceId +import pekko.persistence.typed.scaladsl.Effect +import pekko.persistence.typed.scaladsl.EventSourcedBehavior +import pekko.stream.testkit.scaladsl.TestSink +import org.scalatest.BeforeAndAfterEach +import org.scalatest.wordspec.AnyWordSpecLike + +object EventsBySliceSpec { + val config = PersistenceTestKitPlugin.config.withFallback( + ConfigFactory.parseString(""" + pekko.loglevel = DEBUG + pekko.loggers = ["org.apache.pekko.testkit.SilenceAllTestEventListener"] + pekko.persistence.testkit.events.serialize = off + """)) + + case class Command(evt: Seq[String], ack: ActorRef[Done]) + object Command { + def apply(evt: String, ack: ActorRef[Done]): Command = Command( + Seq(evt), ack + ) + } + case class State() + + def testBehaviour(persistenceId: String) = { + EventSourcedBehavior[Command, String, State]( + PersistenceId.ofUniqueId(makeFullPersistenceId(persistenceId)), + State(), + (_, command) => + Effect.persist(command.evt).thenRun { _ => + command.ack ! Done + }, + (state, _) => state + ) + } + + def makeFullPersistenceId(persistenceId: String) = { + s"Test|$persistenceId" + } +} + +class EventsBySliceSpec + extends ScalaTestWithActorTestKit(EventsByPersistenceIdSpec.config) + with LogCapturing + with AnyWordSpecLike + with BeforeAndAfterEach { + import EventsBySliceSpec._ + + implicit val classic: pekko.actor.ActorSystem = system.classicSystem + + val queries = + PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier) + + def setup(persistenceId: String): ActorRef[Command] = { + val probe = createTestProbe[Done]() + val ref = setupEmpty(persistenceId) + ref ! Command(s"$persistenceId-1", probe.ref) + ref ! Command(s"$persistenceId-2", probe.ref) + ref ! Command(s"$persistenceId-3", probe.ref) + probe.expectMessage(Done) + probe.expectMessage(Done) + probe.expectMessage(Done) + ref + } + + def setupBatched(persistenceId: String): ActorRef[Command] = { + val probe = createTestProbe[Done]() + val ref = setupEmpty(persistenceId) + ref ! Command(Seq(s"$persistenceId-1", s"$persistenceId-2", s"$persistenceId-3"), probe.ref) + probe.expectMessage(Done) + ref + } + + def setupEmpty(persistenceId: String): ActorRef[Command] = { + spawn(testBehaviour(persistenceId)) + } + + private lazy val persistence = Persistence(system) + private lazy val numberOfSlices = persistence.numberOfSlices + + private lazy val eventStorage = InMemStorageExtension(system).storageFor(PersistenceTestKitPlugin.PluginId) + + override protected def beforeEach(): Unit = { + super.beforeEach() + eventStorage.clearAll() + } + + "Persistent test kit live query EventsBySlice" must { + "find new events" in { + val ackProbe = createTestProbe[Done]() + val ref = setup("c") + val src = queries.eventsBySlices[String]("Test", 0, numberOfSlices - 1, NoOffset) + val probe = src.map(_.event).runWith(TestSink.probe[String]).request(5).expectNext("c-1", "c-2", "c-3") + + ref ! Command("c-4", ackProbe.ref) + ackProbe.expectMessage(Done) + + probe.expectNext("c-4") + } + + "find new events after batched setup" in { + val ackProbe = createTestProbe[Done]() + val ref = setupBatched("d") + val src = queries.eventsBySlices[String]("Test", 0, numberOfSlices - 1, NoOffset) + val probe = src.map(_.event).runWith(TestSink.probe[String]).request(5).expectNext("d-1", "d-2", "d-3") + + ref ! Command("d-4", ackProbe.ref) + ackProbe.expectMessage(Done) + + probe.expectNext("d-4") + } + + "find new events after demand request" in { + val ackProbe = createTestProbe[Done]() + val ref = setup("e") + val src = queries.eventsBySlices[String]("Test", 0, numberOfSlices - 1, NoOffset) + val probe = + src.map(_.event).runWith(TestSink.probe[String]).request(2).expectNext("e-1", "e-2").expectNoMessage(100.millis) + + ref ! Command("e-4", ackProbe.ref) + ackProbe.expectMessage(Done) + + probe.expectNoMessage(100.millis).request(5).expectNext("e-3").expectNext("e-4") + } + + "include timestamp in EventEnvelope" in { + setup("n") + + val src = queries.eventsBySlices[String]("Test", 0, numberOfSlices - 1, NoOffset) + val probe = src.runWith(TestSink.probe[EventEnvelope[String]]) + + probe.request(5) + probe.expectNext().timestamp should be > 0L + probe.expectNext().timestamp should be > 0L + probe.cancel() + } + + "not complete for empty slice" in { + val ackProbe = createTestProbe[Done]() + val src = queries.eventsBySlices[String]("Test", 0, numberOfSlices - 1, NoOffset) + val probe = + src.map(_.event).runWith(TestSink.probe[String]).request(2) + + probe.expectNoMessage(200.millis) // must not complete + + val ref = setupEmpty("o") + ref ! Command("o-1", ackProbe.ref) + ackProbe.expectMessage(Done) + + probe.cancel() + } + + "find new events in order that they were persisted when the slice range is used by multiple persistence IDs" in { + val ackProbe = createTestProbe[Done]() + val src = queries.eventsBySlices[String]("Test", 0, numberOfSlices - 1, NoOffset) + val probe = src + .map { ee => + (ee.persistenceId, ee.event) + } + .runWith(TestSink.probe[(String, Any)]) + + val ref2 = setupEmpty("f2") + ref2 ! Command(Seq("f2-1", "f2-2"), ackProbe.ref) + ackProbe.expectMessage(Done) + probe.request(2).expectNextN(Seq((makeFullPersistenceId("f2"), "f2-1"), (makeFullPersistenceId("f2"), "f2-2"))) + + val ref1 = setupEmpty("f1") + ref1 ! Command(Seq("f1-1", "f1-2"), ackProbe.ref) + ackProbe.expectMessage(Done) + probe.request(2).expectNextN(Seq((makeFullPersistenceId("f1"), "f1-1"), (makeFullPersistenceId("f1"), "f1-2"))) + } + + "find new events only for slices that were requested" in { + val persistenceId1 = "g" + val persistenceId2 = "h" + + val slice1 = persistence.sliceForPersistenceId(makeFullPersistenceId(persistenceId1)) + val slice2 = persistence.sliceForPersistenceId(makeFullPersistenceId(persistenceId2)) + slice1 should not be slice2 + + setup(persistenceId1) + setup(persistenceId2) + + def assertSlice(slice: Int, persistenceId: String) = { + val src = queries.eventsBySlices[String]("Test", slice, slice, NoOffset) + val fullPersistenceId = makeFullPersistenceId(persistenceId) + src + .map { ee => + (ee.persistenceId, ee.event) + } + .runWith(TestSink.probe[(String, Any)]) + .request(3) + .expectNextN(Seq((fullPersistenceId, s"$persistenceId-1"), (fullPersistenceId, s"$persistenceId-2"), + (fullPersistenceId, s"$persistenceId-3"))) + } + + assertSlice(slice1, persistenceId1) + assertSlice(slice2, persistenceId2) + } + } +}