diff --git a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOf.scala b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOf.scala index d4cd88e0..ddd6a822 100644 --- a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOf.scala +++ b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOf.scala @@ -91,33 +91,47 @@ object EventSourcedActorOf { replay = recovering.replay - seqNr <- replay.use { replay => - // used to recover snapshot, i.e. the snapshot stored with [[snapSeqNr]] will be loaded, if any - val snapSeqNr = snapshot.map(_.metadata.seqNr).getOrElse(SeqNr.Min) - // used to recover events _following_ the snapshot OR if no snapshot available then [[SeqNr.Min]] - val fromSeqNr = snapshot.map(_.metadata.seqNr + 1).getOrElse(SeqNr.Min) - for { - _ <- log.debug(s"snapshot seqNr: $snapSeqNr, load events from seqNr: $fromSeqNr").allocated - events <- eventStore.events(fromSeqNr) - seqNrL <- events.foldWhileM(snapSeqNr) { - case (_, EventStore.Event(event, seqNr)) => replay(event, seqNr).as(seqNr.asLeft[Unit]) - case (_, EventStore.HighestSeqNr(seqNr)) => seqNr.asLeft[Unit].pure[F] - } - seqNr <- seqNrL match { - case Left(seqNr) => seqNr.pure[F] - case Right(_) => - // function used in events.foldWhileM always returns Left - // and sstream.Stream.foldWhileM returns Right only if passed function do so - // thus makes getting Right here impossible - new IllegalStateException("should never happened").raiseError[F, SeqNr] - } - } yield seqNr - }.toResource - - _ <- log.debug(s"recovery completed with seqNr $seqNr") - journaller <- eventStore.asJournaller(actorCtx, seqNr).toResource - context = Recovering.RecoveryContext(seqNr, journaller, snapshotStore.asSnapshotter) - receive <- recovering.completed(context) + seqNr <- replay + .use { replay => + // used to recover snapshot, i.e. the snapshot stored with [[snapSeqNr]] will be loaded, if any + val snapSeqNr = snapshot.map(_.metadata.seqNr).getOrElse(SeqNr.Min) + // used to recover events _following_ the snapshot OR if no snapshot available then [[SeqNr.Min]] + val fromSeqNr = snapshot.map(_.metadata.seqNr + 1).getOrElse(SeqNr.Min) + for { + _ <- log.debug(s"snapshot seqNr: $snapSeqNr, load events from seqNr: $fromSeqNr").allocated + events <- eventStore.events(fromSeqNr) + seqNrL <- events.foldWhileM(snapSeqNr) { + case (_, EventStore.Event(event, seqNr)) => replay(event, seqNr).as(seqNr.asLeft[Unit]) + case (_, EventStore.HighestSeqNr(seqNr)) => seqNr.asLeft[Unit].pure[F] + } + seqNr <- seqNrL match { + case Left(seqNr) => seqNr.pure[F] + case Right(_) => + // function used in events.foldWhileM always returns Left + // and sstream.Stream.foldWhileM returns Right only if passed function do so + // thus makes getting Right here impossible + new IllegalStateException("should never happened").raiseError[F, SeqNr] + } + } yield seqNr + } + .toResource + .attempt + + receive <- seqNr match { + case Right(seqNr) => + for { + _ <- log.debug(s"recovery completed with seqNr $seqNr") + journaller <- eventStore.asJournaller(actorCtx, seqNr).toResource + receive <- recovering.completed(seqNr, journaller, snapshotStore.asSnapshotter) + } yield receive + + case Left(error) => + for { + _ <- log.error(s"recovery failed", error) + _ <- recovering.failed(error, eventStore.asDeleteTo, snapshotStore.asSnapshotter) + receive <- error.raiseError[F, Receive[F, Envelope[Any], ActorOf.Stop]].toResource + } yield receive + } } yield receive receive.onError { @@ -158,46 +172,53 @@ object EventSourcedActorOf { implicit final private[evolutiongaming] class EventStoreOps[F[_], E](val store: EventStore[F, E]) extends AnyVal { - def asJournaller(actorCtx: ActorCtx[F], seqNr: SeqNr)(implicit F: Concurrent[F], log: Log[F]): F[Journaller[F, E]] = - for { - seqNrRef <- Ref[F].of(seqNr) - } yield new Journaller[F, E] { - val append = new Append[F, E] { - - def apply(events: Events[E]): F[F[SeqNr]] = - seqNrRef - .modify { seqNr => - events.mapAccumulate(seqNr) { - case (seqNr0, event) => - val seqNr1 = seqNr0 + 1 - seqNr1 -> EventStore.Event(event, seqNr1) - } - } - .flatMap { events => - def handleError(err: Throwable) = { - val from = events.values.head.head.seqNr - val to = events.values.last.last.seqNr - stopActor(from, to, err) - } - store - .save(events) - .onError(handleError) - .flatTap(_.onError(handleError)) - } + def asDeleteTo: DeleteEventsTo[F] = new DeleteEventsTo[F] { - private def stopActor(from: SeqNr, to: SeqNr, error: Throwable): F[Unit] = - for { - _ <- log.error(s"failed to append events with seqNr range [$from .. $to], stopping actor", error) - _ <- actorCtx.stop - } yield {} + def apply(seqNr: SeqNr): F[F[Unit]] = store.deleteTo(seqNr) - } + } - val deleteTo = new DeleteEventsTo[F] { + def asAppend( + actorCtx: ActorCtx[F], + seqNrRef: Ref[F, SeqNr], + )(implicit F: Concurrent[F], log: Log[F]): Append[F, E] = + new Append[F, E] { + + def apply(events: Events[E]): F[F[SeqNr]] = + seqNrRef + .modify { seqNr => + events.mapAccumulate(seqNr) { + case (seqNr0, event) => + val seqNr1 = seqNr0 + 1 + seqNr1 -> EventStore.Event(event, seqNr1) + } + } + .flatMap { events => + def handleError(err: Throwable) = { + val from = events.values.head.head.seqNr + val to = events.values.last.last.seqNr + stopActor(from, to, err) + } + store + .save(events) + .onError(handleError) + .flatTap(_.onError(handleError)) + } + + private def stopActor(from: SeqNr, to: SeqNr, error: Throwable): F[Unit] = + for { + _ <- log.error(s"failed to append events with seqNr range [$from .. $to], stopping actor", error) + _ <- actorCtx.stop + } yield {} - def apply(seqNr: SeqNr): F[F[Unit]] = store.deleteTo(seqNr) + } - } + def asJournaller(actorCtx: ActorCtx[F], seqNr: SeqNr)(implicit F: Concurrent[F], log: Log[F]): F[Journaller[F, E]] = + for { + seqNrRef <- Ref[F].of(seqNr) + } yield new Journaller[F, E] { + val append = asAppend(actorCtx, seqNrRef) + val deleteTo = asDeleteTo } } } diff --git a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Persistence.scala b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Persistence.scala index 9a3ca884..bf411650 100644 --- a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Persistence.scala +++ b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Persistence.scala @@ -58,8 +58,7 @@ private[akkaeffect] object Persistence { ) = { val receive = for { recovering <- recoveryStarted(seqNr, none) - context = Recovering.RecoveryContext(seqNr, journaller, snapshotter) - receive <- recovering.completed(context) + receive <- recovering.completed(seqNr, journaller, snapshotter) } yield Persistence.receive[F, S, E, C](receive) receive.toReleasable } @@ -114,9 +113,8 @@ private[akkaeffect] object Persistence { .foldMapM(_.release) .toResource .productR { - val context = Recovering.RecoveryContext(seqNr, journaller, snapshotter) recovering - .completed(context) + .completed(seqNr, journaller, snapshotter) .map(receive => Persistence.receive[F, S, E, C](receive)) } .toReleasable diff --git a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Recovering.scala b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Recovering.scala index 8bebbf03..5e02eaf8 100644 --- a/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Recovering.scala +++ b/persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/Recovering.scala @@ -1,12 +1,10 @@ package com.evolutiongaming.akkaeffect.persistence -import cats.Monad import cats.effect.Resource import cats.implicits.catsSyntaxApplicativeId +import cats.{ApplicativeError, Monad} import com.evolutiongaming.akkaeffect.{Envelope, Receive} -import scala.annotation.nowarn - /** Describes "Recovery" phase * * @tparam S @@ -27,82 +25,136 @@ trait Recovering[F[_], S, E, +A] { * @see * [[akka.persistence.RecoveryCompleted]] */ - @deprecated("Use completed with RecoveryContext", "4.1.5") def completed( seqNr: SeqNr, journaller: Journaller[F, E], snapshotter: Snapshotter[F, S], ): Resource[F, A] - /** Called when recovery completed, resource will be released upon actor termination + /** Called when state was transferred via state-transfer, resource will be released upon actor termination + */ + def transferred( + seqNr: SeqNr, + journaller: Journaller[F, E], + snapshotter: Snapshotter[F, S], + ): Resource[F, A] + + /** Called when recovery failed * - * @see - * [[akka.persistence.RecoveryCompleted]] + * @param journaller + * of type [[DeleteEventsTo]] because instance of [[Journaller]] is not available in this case. [[Journaller]] can + * be created based on known [[SeqNr]], while its now known in case of failure. */ - def completed(context: Recovering.RecoveryContext[F, S, E]): Resource[F, A] = { - @nowarn("msg=deprecated") - val a = completed(context.seqNr, context.journaller, context.snapshotter) - a - } + def failed( + cause: Throwable, + journaller: DeleteEventsTo[F], + snapshotter: Snapshotter[F, S], + ): Resource[F, Unit] } object Recovering { - /** Context containing information about recovery and provides access to journaller and snapshotter - */ - trait RecoveryContext[F[_], -S, -E] { - def seqNr: SeqNr - def journaller: Journaller[F, E] - def snapshotter: Snapshotter[F, S] - def recoveredFromPersistence: Boolean - } - object RecoveryContext { - - private case class Impl[F[_], S, E]( - seqNr: SeqNr, - journaller: Journaller[F, E], - snapshotter: Snapshotter[F, S], - recoveredFromPersistence: Boolean, - ) extends RecoveryContext[F, S, E] - - def apply[F[_], S, E]( - seqNr: SeqNr, - journaller: Journaller[F, E], - snapshotter: Snapshotter[F, S], - recoveredFromPersistence: Boolean = true, - ): RecoveryContext[F, S, E] = Impl(seqNr, journaller, snapshotter, recoveredFromPersistence) - - } - - def apply[S]: Apply[S] = new Apply[S] + @deprecated( + "This factory provided as direct replacement of Recovering[A](...)(...) and Recovering.const[A](...)(...), avoid using it as much as possible", + "5.0.0", + ) + def legacy[S]: Legacy[S] = new Legacy[S] - final private[Recovering] class Apply[S](private val b: Boolean = true) extends AnyVal { + final class Legacy[S](private val b: Boolean = true) extends AnyVal { - @deprecated("Use apply with RecoveryContext", "4.1.7") - def apply[F[_], E, A](replay: Resource[F, Replay[F, E]])( + /** This factory method is provided as direct replacement of removed Recovering[A](...)(...) and re-used `completed` + * function for both `completed` and `transferred` cases. `failed` case re-throw the error. + */ + def apply[F[_], E, A]( + replay: Resource[F, Replay[F, E]], + )( completed: (SeqNr, Journaller[F, E], Snapshotter[F, S]) => Resource[F, A], + )(implicit + F: ApplicativeError[F, Throwable], ): Recovering[F, S, E, A] = { + val replay1 = replay val completed1 = completed + new Recovering[F, S, E, A] { + override def replay = replay1 - def replay = replay1 + override def completed( + seqNr: SeqNr, + journaller: Journaller[F, E], + snapshotter: Snapshotter[F, S], + ) = completed1(seqNr, journaller, snapshotter) - def completed( + override def transferred( seqNr: SeqNr, journaller: Journaller[F, E], snapshotter: Snapshotter[F, S], - ) = - completed1(seqNr, journaller, snapshotter) + ) = completed1(seqNr, journaller, snapshotter) + + override def failed( + cause: Throwable, + journaller: DeleteEventsTo[F], + snapshotter: Snapshotter[F, S], + ) = Resource.raiseError[F, Unit, Throwable](cause) } } - def apply1[F[_], E, A](replay: Resource[F, Replay[F, E]])( - completed: Recovering.RecoveryContext[F, S, E] => Resource[F, A], + def const[F[_], E, A]( + replay: Resource[F, Replay[F, E]], + )( + completed: Resource[F, A], + )(implicit + F: ApplicativeError[F, Throwable], ): Recovering[F, S, E, A] = { + val replay1 = replay val completed1 = completed + + new Recovering[F, S, E, A] { + override def replay = replay1 + + override def completed( + seqNr: SeqNr, + journaller: Journaller[F, E], + snapshotter: Snapshotter[F, S], + ) = completed1 + + override def transferred( + seqNr: SeqNr, + journaller: Journaller[F, E], + snapshotter: Snapshotter[F, S], + ) = completed1 + + override def failed( + cause: Throwable, + journaller: DeleteEventsTo[F], + snapshotter: Snapshotter[F, S], + ) = Resource.raiseError[F, Unit, Throwable](cause) + } + } + + } + + def apply[S]: Apply[S] = new Apply[S] + + final class Apply[S](private val b: Boolean = true) extends AnyVal { + + def apply[F[_], E, A]( + replay: Resource[F, Replay[F, E]], + )( + completed: (SeqNr, Journaller[F, E], Snapshotter[F, S]) => Resource[F, A], + )( + transferred: (SeqNr, Journaller[F, E], Snapshotter[F, S]) => Resource[F, A], + )( + failed: (Throwable, DeleteEventsTo[F], Snapshotter[F, S]) => Resource[F, Unit], + ): Recovering[F, S, E, A] = { + + val replay1 = replay + val completed1 = completed + val transferred1 = transferred + val failed1 = failed + new Recovering[F, S, E, A] { override def replay = replay1 @@ -111,30 +163,67 @@ object Recovering { seqNr: SeqNr, journaller: Journaller[F, E], snapshotter: Snapshotter[F, S], - ) = completed1(RecoveryContext(seqNr, journaller, snapshotter)) + ) = completed1(seqNr, journaller, snapshotter) - override def completed(ctx: RecoveryContext[F, S, E]) = completed1(ctx) + override def transferred( + seqNr: SeqNr, + journaller: Journaller[F, E], + snapshotter: Snapshotter[F, S], + ) = transferred1(seqNr, journaller, snapshotter) + + override def failed( + cause: Throwable, + journaller: DeleteEventsTo[F], + snapshotter: Snapshotter[F, S], + ) = failed1(cause, journaller, snapshotter) } + } } def const[S]: Const[S] = new Const[S] - final private[Recovering] class Const[S](private val b: Boolean = true) extends AnyVal { + final class Const[S](private val b: Boolean = true) extends AnyVal { - def apply[F[_], E, A](replay: Resource[F, Replay[F, E]])( + def apply[F[_], E, A]( + replay: Resource[F, Replay[F, E]], + )( completed: Resource[F, A], + )( + transferred: Resource[F, A], + )( + failed: Resource[F, Unit], ): Recovering[F, S, E, A] = { - val replay1 = replay - val completed1 = completed + + val replay1 = replay + val completed1 = completed + val transferred1 = transferred + val failed1 = failed + new Recovering[F, S, E, A] { - def replay = replay1 + override def replay = replay1 + + override def completed( + seqNr: SeqNr, + journaller: Journaller[F, E], + snapshotter: Snapshotter[F, S], + ) = completed1 + + override def transferred( + seqNr: SeqNr, + journaller: Journaller[F, E], + snapshotter: Snapshotter[F, S], + ) = transferred1 - def completed(seqNr: SeqNr, journaller: Journaller[F, E], snapshotter: Snapshotter[F, S]) = - completed1 + override def failed( + cause: Throwable, + journaller: DeleteEventsTo[F], + snapshotter: Snapshotter[F, S], + ) = failed1 } + } } @@ -144,62 +233,86 @@ object Recovering { F: Monad[F], ): Recovering[F, S1, E1, A1] = new Recovering[F, S1, E1, A1] { - def replay = self.replay.map(_.convert(e1f)) + override def replay = self.replay.map(_.convert(e1f)) - def completed( + override def completed( seqNr: SeqNr, journaller: Journaller[F, E1], snapshotter: Snapshotter[F, S1], ) = { val journaller1 = journaller.convert(ef) val snapshotter1 = snapshotter.convert(sf) - val context1 = RecoveryContext(seqNr, journaller1, snapshotter1) - self.completed(context1).flatMap(af) + self.completed(seqNr, journaller1, snapshotter1).flatMap(af) } - override def completed(context: RecoveryContext[F, S1, E1]) = { - val journaller1 = context.journaller.convert(ef) - val snapshotter1 = context.snapshotter.convert(sf) - val context1 = RecoveryContext(context.seqNr, journaller1, snapshotter1) - self.completed(context1).flatMap(af) + override def transferred( + seqNr: SeqNr, + journaller: Journaller[F, E1], + snapshotter: Snapshotter[F, S1], + ) = { + val journaller1 = journaller.convert(ef) + val snapshotter1 = snapshotter.convert(sf) + self.transferred(seqNr, journaller1, snapshotter1).flatMap(af) + } + + override def failed( + cause: Throwable, + journaller: DeleteEventsTo[F], + snapshotter: Snapshotter[F, S1], + ) = { + val snapshotter1 = snapshotter.convert(sf) + self.failed(cause, journaller, snapshotter1) } } def map[A1](f: A => A1): Recovering[F, S, E, A1] = new Recovering[F, S, E, A1] { - def replay = self.replay + override def replay = self.replay - def completed( + override def completed( seqNr: SeqNr, journaller: Journaller[F, E], snapshotter: Snapshotter[F, S], - ) = { - val context = RecoveryContext(seqNr, journaller, snapshotter) - self.completed(context).map(f) - } + ) = self.completed(seqNr, journaller, snapshotter).map(f) + + override def transferred( + seqNr: SeqNr, + journaller: Journaller[F, E], + snapshotter: Snapshotter[F, S], + ) = self.transferred(seqNr, journaller, snapshotter).map(f) + + override def failed( + cause: Throwable, + journaller: DeleteEventsTo[F], + snapshotter: Snapshotter[F, S], + ) = self.failed(cause, journaller, snapshotter) - override def completed(context: RecoveryContext[F, S, E]) = - self.completed(context).map(f) } def mapM[A1]( f: A => Resource[F, A1], ): Recovering[F, S, E, A1] = new Recovering[F, S, E, A1] { - def replay = self.replay + override def replay = self.replay - def completed( + override def completed( seqNr: SeqNr, journaller: Journaller[F, E], snapshotter: Snapshotter[F, S], - ) = { - val context = RecoveryContext(seqNr, journaller, snapshotter) - self.completed(context).flatMap(f) - } + ) = self.completed(seqNr, journaller, snapshotter).flatMap(f) - override def completed(context: RecoveryContext[F, S, E]) = - self.completed(context).flatMap(f) + override def transferred( + seqNr: SeqNr, + journaller: Journaller[F, E], + snapshotter: Snapshotter[F, S], + ) = self.transferred(seqNr, journaller, snapshotter).flatMap(f) + + override def failed( + cause: Throwable, + journaller: DeleteEventsTo[F], + snapshotter: Snapshotter[F, S], + ) = self.failed(cause, journaller, snapshotter) } } @@ -212,16 +325,25 @@ object Recovering { ): Recovering[F, S1, E1, Receive[F, Envelope[C1], Boolean]] = new Recovering[F, S1, E1, Receive[F, Envelope[C1], Boolean]] { - def replay = self.replay.map(_.convert(ef)) + override def replay = self.replay.map(_.convert(ef)) - def completed( + override def completed( seqNr: SeqNr, journaller: Journaller[F, E1], snapshotter: Snapshotter[F, S1], - ) = { - val context = RecoveryContext(seqNr, journaller, snapshotter) - self.completed(context).map(_.convert(cf, _.pure[F])) - } + ) = self.completed(seqNr, journaller, snapshotter).map(_.convert(cf, _.pure[F])) + + override def transferred( + seqNr: SeqNr, + journaller: Journaller[F, E1], + snapshotter: Snapshotter[F, S1], + ) = self.transferred(seqNr, journaller, snapshotter).map(_.convert(cf, _.pure[F])) + + override def failed( + cause: Throwable, + journaller: DeleteEventsTo[F], + snapshotter: Snapshotter[F, S1], + ) = self.failed(cause, journaller, snapshotter) } def typeless(ef: Any => F[E], cf: Any => F[C])(implicit diff --git a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOfTest.scala b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOfTest.scala index f058ff24..9a4e6d44 100644 --- a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOfTest.scala +++ b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOfTest.scala @@ -10,6 +10,7 @@ import cats.effect.unsafe.implicits.global import cats.syntax.all.* import com.evolutiongaming.akkaeffect.IOSuite.* import com.evolutiongaming.akkaeffect.persistence.InstrumentEventSourced.Action +import com.evolutiongaming.akkaeffect.persistence.SeqNr import com.evolutiongaming.akkaeffect.testkit.Probe import com.evolutiongaming.akkaeffect.{ActorSuite, *} import com.evolutiongaming.catshelper.CatsHelper.* @@ -18,6 +19,7 @@ import org.scalatest.funsuite.AsyncFunSuite import org.scalatest.matchers.should.Matchers import java.time.Instant +import scala.annotation.nowarn import scala.concurrent.duration.* import scala.reflect.ClassTag @@ -65,6 +67,10 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher `recoveryCompleted stops`[IO](actorSystem).run() } + test("recoveryFailed") { + `recoveryFailed`[IO](actorSystem).run() + } + test("append many") { `append many`[IO](actorSystem).run() } @@ -98,54 +104,55 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher val recoveryStarted = RecoveryStarted .const { - Recovering[State] - .apply1 { + Recovering + .legacy[State] { Replay.empty[F, Event].pure[Resource[F, *]] - } { recoveringCtx => - for { - stateRef <- Ref[F].of(0).toResource - } yield Receive[Envelope[Cmd]] { envelope => - val reply = Reply.fromActorRef[F](to = envelope.from, from = actorCtx.self) - - envelope.msg match { - case a: Cmd.WithCtx[_] => - for { - a <- a.f(actorCtx) - _ <- reply(a) - } yield false - - case Cmd.Inc => - for { - seqNr <- recoveringCtx.journaller.append(Events.of("a")).flatten - _ <- stateRef.update(_ + 1) - state <- stateRef.get - result <- recoveringCtx.snapshotter.save(seqNr, state) - seqNr <- recoveringCtx.journaller - .append(Events.batched(Nel.of("b"), Nel.of("c", "d"))) - .flatten - _ <- result - _ <- stateRef.update(_ + 1) - _ <- reply(seqNr) - } yield false - - case Cmd.Stop => - for { - _ <- reply("stopping") - } yield true - } - } { + } { + case (_, journaller, snapshotter) => for { - _ <- actorCtx.setReceiveTimeout(Duration.Inf) - _ <- receiveTimeout - } yield false - } - .contramapM[Envelope[Any]] { envelope => - envelope.msg - .castM[F, Cmd] - .map(a => envelope.copy(msg = a)) + stateRef <- Ref[F].of(0).toResource + } yield Receive[Envelope[Cmd]] { envelope => + val reply = Reply.fromActorRef[F](to = envelope.from, from = actorCtx.self) + + envelope.msg match { + case a: Cmd.WithCtx[_] => + for { + a <- a.f(actorCtx) + _ <- reply(a) + } yield false + + case Cmd.Inc => + for { + seqNr <- journaller.append(Events.of("a")).flatten + _ <- stateRef.update(_ + 1) + state <- stateRef.get + result <- snapshotter.save(seqNr, state) + seqNr <- journaller + .append(Events.batched(Nel.of("b"), Nel.of("c", "d"))) + .flatten + _ <- result + _ <- stateRef.update(_ + 1) + _ <- reply(seqNr) + } yield false + + case Cmd.Stop => + for { + _ <- reply("stopping") + } yield true + } + } { + for { + _ <- actorCtx.setReceiveTimeout(Duration.Inf) + _ <- receiveTimeout + } yield false } + .contramapM[Envelope[Any]] { envelope => + envelope.msg + .castM[F, Cmd] + .map(a => envelope.copy(msg = a)) + } } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } .pure[Resource[F, *]] EventSourced(EventSourcedId("id"), value = recoveryStarted).pure[F] @@ -246,7 +253,8 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher val recoveryStarted = { val started = RecoveryStarted[S] { (_, _) => Recovering - .const[S] { + .legacy[S] + .const { Replay.empty[F, E].pure[Resource[F, *]] } { startedDeferred @@ -254,7 +262,7 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher .toResource .as(Receive.const[Envelope[C]](false.pure[F])) } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource .make(().pure[F])(_ => stoppedDeferred.complete(()).void) @@ -304,18 +312,19 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay.empty[F, E].pure[Resource[F, *]] - } { recoveringCtx => - val receive = for { - seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten - _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource + } { + case (_, journaller, snapshotter) => + val receive = for { + seqNr <- journaller.append(Events.of(0)).flatten + _ <- snapshotter.save(seqNr, 1).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource @@ -394,20 +403,21 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay.empty[F, E].pure[Resource[F, *]] - } { recoveringCtx => - val receive = for { - seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten - _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten - seqNr <- recoveringCtx.journaller.append(Events.of(1)).flatten - _ <- recoveringCtx.journaller.deleteTo(seqNr).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource + } { + case (_, journaller, snapshotter) => + val receive = for { + seqNr <- journaller.append(Events.of(0)).flatten + _ <- snapshotter.save(seqNr, 1).flatten + seqNr <- journaller.append(Events.of(1)).flatten + _ <- journaller.deleteTo(seqNr).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource @@ -498,17 +508,18 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay.empty[F, E].pure[Resource[F, *]] - } { recoveringCtx => - val receive = for { - _ <- recoveringCtx.journaller.append(Events.batched(Nel.of(0, 1), Nel.of(2))).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource + } { + case (_, journaller, _) => + val receive = for { + _ <- journaller.append(Events.batched(Nel.of(0, 1), Nel.of(2))).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource @@ -584,20 +595,21 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay.empty[F, E].pure[Resource[F, *]] - } { recoveringCtx => - val receive = for { - seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten - _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten - _ <- recoveringCtx.journaller.append(Events.of(1)).flatten - _ <- recoveringCtx.snapshotter.delete(seqNr).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource + } { + case (_, journaller, snapshotter) => + val receive = for { + seqNr <- journaller.append(Events.of(0)).flatten + _ <- snapshotter.save(seqNr, 1).flatten + _ <- journaller.append(Events.of(1)).flatten + _ <- snapshotter.delete(seqNr).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource @@ -690,20 +702,21 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay.empty[F, E].pure[Resource[F, *]] - } { recoveringCtx => - val receive = for { - seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten - _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten - _ <- recoveringCtx.journaller.append(Events.of(1)).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - - receive.toResource + } { + case (_, journaller, snapshotter) => + val receive = for { + seqNr <- journaller.append(Events.of(0)).flatten + _ <- snapshotter.save(seqNr, 1).flatten + _ <- journaller.append(Events.of(1)).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + + receive.toResource } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource @@ -792,17 +805,18 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher .make(delay productR actorCtx.stop)(_ => stopped.complete(()).void) .as { RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay .empty[F, E] .pure[Resource[F, *]] - } { _ => - Receive - .const[Envelope[C]](false.pure[F]) - .pure[Resource[F, *]] + } { + case (_, _, _) => + Receive + .const[Envelope[C]](false.pure[F]) + .pure[Resource[F, *]] } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } } EventSourced(EventSourcedId("10"), value = recoveryStarted).pure[F] @@ -862,15 +876,17 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher Resource .make(lock.get productR actorCtx.stop)(_ => stopped.complete(()).void) .as { - Recovering.const[S] { - Replay - .empty[F, E] - .pure[Resource[F, *]] - } { - Receive - .const[Envelope[C]](false.pure[F]) - .pure[Resource[F, *]] - } + Recovering + .legacy[S] + .const { + Replay + .empty[F, E] + .pure[Resource[F, *]] + } { + Receive + .const[Envelope[C]](false.pure[F]) + .pure[Resource[F, *]] + }: @nowarn } } .pure[Resource[F, *]] @@ -926,7 +942,8 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher RecoveryStarted .const { Recovering - .const[S] { + .legacy[S] + .const { Replay .empty[F, E] .pure[Resource[F, *]] @@ -935,7 +952,7 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher .make(lock.get productR actorCtx.stop)(_ => stopped.complete(()).void) .as(Receive.const[Envelope[C]](false.pure[F])) } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } .pure[Resource[F, *]] EventSourced(EventSourcedId("5"), value = recoveryStarted).pure[F] @@ -974,6 +991,90 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher } yield {} } + private def `recoveryFailed`[F[_]: Async: ToFuture: FromFuture: ToTry: LogOf]( + actorSystem: ActorSystem, + ): F[Unit] = { + + val actorRefOf = ActorRefOf.fromActorRefFactory[F](actorSystem) + + type S = Unit + type C = Any + type E = Int + + def eventSourced[A](value: A) = EventSourced(EventSourcedId("5.1"), value = value) + + val recoveryError = new IllegalArgumentException("test error: cannot apply event on state") + + def eventSourcedOf( + lock: Deferred[F, Unit], + stopped: Deferred[F, Unit], + ) = + EventSourcedOf[F] { actorCtx => + val recoveryStarted = + RecoveryStarted + .const { + Recovering[S] { + Replay + .const[E](recoveryError.raiseError[F, Unit]) + .pure[Resource[F, *]] + } { + case (_, _, _) => + Resource + .eval(Async[F].delay(fail("Recovering.completed should not be called"))) + .as(Receive.const[Envelope[C]](false.pure[F])) + } { + case (_, _, _) => + Resource + .eval(Async[F].delay(fail("Recovering.transferred should not be called"))) + .as(Receive.const[Envelope[C]](false.pure[F])) + } { + case (_, _, _) => + Resource + .make(lock.get productR actorCtx.stop)(_ => stopped.complete(()).void) + .void + }.pure[Resource[F, *]] + } + .pure[Resource[F, *]] + eventSourced[ + Resource[F, RecoveryStarted[F, S, E, Receive[F, Envelope[Any], ActorOf.Stop]]], + ](recoveryStarted).pure[F] + } + + for { + lock <- Deferred[F, Unit] + stopped <- Deferred[F, Unit] + actions <- Ref[F].of(List.empty[Action[S, C, E]]) + eventSourcedOf <- InstrumentEventSourced(actions, eventSourcedOf(lock, stopped)) + .typeless(_.castM[F, S], _.castM[F, E], _.pure[F]) + .pure[F] + persistence <- persistence[F].pure[F] + eventStore <- persistence.eventStore(eventSourced {}) + seqNr <- eventStore.save(Events.of(EventStore.Event(42, 1L))).flatten + actorEffect = EventSourcedActorEffect.of(actorRefOf, eventSourcedOf, persistence) + actorEffect <- actorEffect.allocated.map { case (actorEffect, _) => actorEffect } + _ <- Probe.of(actorRefOf).use { probe => + for { + terminated <- probe.watch(actorEffect.toUnsafe) + _ <- lock.complete(()) + _ <- terminated + } yield {} + } + _ <- stopped.get + _ <- Async[F].sleep(10.millis) // Make sure all actions are performed first + actions <- actions.get + _ = actions.reverse shouldEqual List( + Action.Created(EventSourcedId("5.1"), akka.persistence.Recovery(), PluginIds.Empty), + Action.Started, + Action.RecoveryAllocated(0L, none), + Action.ReplayAllocated, + Action.ReplayReleased, + Action.RecoveryFailed(recoveryError), + Action.RecoveryReleased, + Action.Released, + ) + } yield {} + } + private def `append many`[F[_]: Async: ToFuture: FromFuture: ToTry: LogOf]( actorSystem: ActorSystem, ): F[Unit] = { @@ -995,30 +1096,31 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher for { stateRef <- Ref[F].of(true).toResource - } yield Recovering[S].apply1 { + } yield Recovering.legacy[S] { Replay.const[E](stateRef.set(false)).pure[Resource[F, *]] - } { recoveringCtx => - def append: F[Unit] = - for { - state <- stateRef.get - result <- - if (state) { - events - .traverse { event => - recoveringCtx.journaller.append(Events.of(event)) - } - .flatMap(_.foldMapM(_.void)) - } else { - ().pure[F] - } - } yield result + } { + case (_, journaller, _) => + def append: F[Unit] = + for { + state <- stateRef.get + result <- + if (state) { + events + .traverse { event => + journaller.append(Events.of(event)) + } + .flatMap(_.foldMapM(_.void)) + } else { + ().pure[F] + } + } yield result - val receive = for { - _ <- append - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource - } + val receive = for { + _ <- append + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource + }: @nowarn } Resource .make(().pure[F])(_ => stoppedDeferred.complete(()).void) @@ -1102,19 +1204,21 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher } yield RecoveryStarted.const { for { _ <- actorCtx.setReceiveTimeout(10.millis).toResource - } yield Recovering.const[S] { - Replay - .empty[F, E] - .pure[Resource[F, *]] - } { - for { - _ <- actorCtx.setReceiveTimeout(10.millis).toResource - } yield Receive[Envelope[C]] { _ => - false.pure[F] + } yield Recovering + .legacy[S] + .const { + Replay + .empty[F, E] + .pure[Resource[F, *]] } { - timedOut.complete(()).as(true) - } - } + for { + _ <- actorCtx.setReceiveTimeout(10.millis).toResource + } yield Receive[Envelope[C]] { _ => + false.pure[F] + } { + timedOut.complete(()).as(true) + } + }: @nowarn } EventSourced(EventSourcedId("9"), value = recoveryStarted) } diff --git a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/InstrumentEventSourced.scala b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/InstrumentEventSourced.scala index 6c3c5f49..2d8ee4c8 100644 --- a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/InstrumentEventSourced.scala +++ b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/InstrumentEventSourced.scala @@ -2,6 +2,7 @@ package com.evolutiongaming.akkaeffect.persistence import akka.actor.ActorRef import akka.persistence.{Recovery, SnapshotSelectionCriteria} +import cats.effect.syntax.resource.* import cats.effect.{Ref, Resource, Sync} import cats.syntax.all.* import com.evolutiongaming.akkaeffect.* @@ -40,23 +41,21 @@ object InstrumentEventSourced { snapshotOffer.copy(metadata = metadata) } - for { - recovering <- recoveryStarted(seqNr, snapshotOffer) - _ <- resource(Action.RecoveryAllocated(seqNr, snapshotOffer1), Action.RecoveryReleased) - } yield Recovering[S].apply1 { - for { - _ <- resource(Action.ReplayAllocated, Action.ReplayReleased) - replay <- recovering.replay - } yield Replay[E] { (event, seqNr) => - for { - _ <- replay(event, seqNr) - _ <- record(Action.Replayed(event, seqNr)) - } yield {} + def instrumentedDeleteEventsTo(deleteTo: DeleteEventsTo[F]) = + new Instrument with DeleteEventsTo[F] { + def apply(seqNr: SeqNr) = + for { + _ <- record(Action.DeleteEventsTo(seqNr)) + a <- deleteTo(seqNr) + _ <- record(Action.DeleteEventsToOuter) + } yield for { + a <- a + _ <- record(Action.DeleteEventsToInner) + } yield a } - } { recoveringCtx => - import recoveringCtx.* - val journaller1 = new Instrument with Journaller[F, E] { + def instrumentedJournaller(journaller: Journaller[F, E]) = + new Instrument with Journaller[F, E] { def append = events => for { @@ -79,7 +78,8 @@ object InstrumentEventSourced { } yield a } - val snapshotter1 = new Instrument with Snapshotter[F, S] { + def instrumentedSnapshotter(snapshotter: Snapshotter[F, S]) = + new Instrument with Snapshotter[F, S] { def save(seqNr: SeqNr, snapshot: S) = for { @@ -112,13 +112,8 @@ object InstrumentEventSourced { } yield a } - for { - context <- Recovering - .RecoveryContext(recoveringCtx.seqNr, journaller1, snapshotter1, recoveredFromPersistence) - .pure[Resource[F, *]] - receive <- recovering.completed(context) - _ <- resource(Action.ReceiveAllocated(recoveringCtx.seqNr), Action.ReceiveReleased) - } yield Receive[Envelope[C]] { envelope => + def instrumentedReceive(receive: Receive[F, Envelope[C], ActorOf.Stop]) = + Receive[Envelope[C]] { envelope => for { stop <- receive(envelope) _ <- record(Action.Received(envelope.msg, envelope.from, stop)) @@ -129,6 +124,47 @@ object InstrumentEventSourced { _ <- record(Action.ReceiveTimeout) } yield stop } + + for { + recovering <- recoveryStarted(seqNr, snapshotOffer) + _ <- resource(Action.RecoveryAllocated(seqNr, snapshotOffer1), Action.RecoveryReleased) + } yield Recovering[S] { + for { + _ <- resource(Action.ReplayAllocated, Action.ReplayReleased) + replay <- recovering.replay + } yield Replay[E] { (event, seqNr) => + for { + _ <- replay(event, seqNr) + _ <- record(Action.Replayed(event, seqNr)) + } yield {} + } + } { + case (seqNr, journaller, snapshotter) => + val journaller1 = instrumentedJournaller(journaller) + val snapshotter1 = instrumentedSnapshotter(snapshotter) + + for { + receive <- recovering.completed(seqNr, journaller1, snapshotter1) + _ <- resource(Action.ReceiveAllocated(seqNr), Action.ReceiveReleased) + } yield instrumentedReceive(receive) + } { + case (seqNr, journaller, snapshotter) => + val journaller1 = instrumentedJournaller(journaller) + val snapshotter1 = instrumentedSnapshotter(snapshotter) + + for { + receive <- recovering.transferred(seqNr, journaller1, snapshotter1) + _ <- resource(Action.ReceiveAllocated(seqNr), Action.ReceiveReleased) + } yield instrumentedReceive(receive) + } { + case (error, journaller, snapshotter) => + val journaller1 = instrumentedDeleteEventsTo(journaller) + val snapshotter1 = instrumentedSnapshotter(snapshotter) + + for { + _ <- recovering.failed(error, journaller1, snapshotter1) + _ <- record(Action.RecoveryFailed(error)).toResource + } yield {} } } } @@ -194,6 +230,8 @@ object InstrumentEventSourced { final case class ReceiveAllocated[S](seqNr: SeqNr) extends Action[S, Nothing, Nothing] + final case class RecoveryFailed(cause: Throwable) extends Action[Nothing, Nothing, Nothing] + final case object ReceiveReleased extends Action[Nothing, Nothing, Nothing] final case class Received[C]( diff --git a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistenceFailureTest.scala b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistenceFailureTest.scala index 397d910b..f5ee70d2 100644 --- a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistenceFailureTest.scala +++ b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistenceFailureTest.scala @@ -12,6 +12,7 @@ import com.evolutiongaming.catshelper.LogOf import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import scala.annotation.nowarn import scala.concurrent.Future import scala.concurrent.duration.* import scala.util.Try @@ -32,18 +33,19 @@ class PersistenceFailureTest extends AnyFunSuite with Matchers { eventSourcedId = EventSourcedId("test"), pluginIds = PluginIds(journal, snapshot), value = RecoveryStarted[Unit] { (_, _) => - Recovering[Unit] - .apply1 { + Recovering + .legacy[Unit] { Replay.empty[IO, Event].pure[Resource[IO, *]] - } { recoveringCtx => - Receive[Envelope[Event]] { envelope => - // persist event in forked thread thus don't fail actor directly - recoveringCtx.journaller.append(Events.of(envelope.msg)).flatten.start.as(false) - } { - IO(true) - }.pure[Resource[IO, *]] + } { + case (_, journaller, _) => + Receive[Envelope[Event]] { envelope => + // persist event in forked thread thus don't fail actor directly + journaller.append(Events.of(envelope.msg)).flatten.start.as(false) + } { + IO(true) + }.pure[Resource[IO, *]] } - .pure[Resource[IO, *]] + .pure[Resource[IO, *]]: @nowarn }.typeless( sf = _ => IO.unit, ef = e => IO(e.asInstanceOf[Event]), diff --git a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistentActorOfTest.scala b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistentActorOfTest.scala index d4b82341..2c98a5d7 100644 --- a/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistentActorOfTest.scala +++ b/persistence/src/test/scala/com/evolutiongaming/akkaeffect/persistence/PersistentActorOfTest.scala @@ -19,6 +19,7 @@ import org.scalatest.funsuite.AsyncFunSuite import org.scalatest.matchers.should.Matchers import java.time.Instant +import scala.annotation.nowarn import scala.concurrent.duration.* import scala.reflect.ClassTag @@ -94,54 +95,55 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers val recoveryStarted = RecoveryStarted .const { - Recovering[State] - .apply1 { + Recovering + .legacy[State] { Replay.empty[F, Event].pure[Resource[F, *]] - } { recoveringCtx => - for { - stateRef <- Ref[F].of(0).toResource - } yield Receive[Envelope[Cmd]] { envelope => - val reply = Reply.fromActorRef[F](to = envelope.from, from = actorCtx.self) - - envelope.msg match { - case a: Cmd.WithCtx[_] => - for { - a <- a.f(actorCtx) - _ <- reply(a) - } yield false - - case Cmd.Inc => - for { - seqNr <- recoveringCtx.journaller.append(Events.of("a")).flatten - _ <- stateRef.update(_ + 1) - state <- stateRef.get - result <- recoveringCtx.snapshotter.save(seqNr, state) - seqNr <- recoveringCtx.journaller - .append(Events.batched(Nel.of("b"), Nel.of("c", "d"))) - .flatten - _ <- result - _ <- stateRef.update(_ + 1) - _ <- reply(seqNr) - } yield false - - case Cmd.Stop => - for { - _ <- reply("stopping") - } yield true - } - } { + } { + case (_, journaller, snapshotter) => for { - _ <- actorCtx.setReceiveTimeout(Duration.Inf) - _ <- receiveTimeout - } yield false - } - .contramapM[Envelope[Any]] { envelope => - envelope.msg - .castM[F, Cmd] - .map(a => envelope.copy(msg = a)) + stateRef <- Ref[F].of(0).toResource + } yield Receive[Envelope[Cmd]] { envelope => + val reply = Reply.fromActorRef[F](to = envelope.from, from = actorCtx.self) + + envelope.msg match { + case a: Cmd.WithCtx[_] => + for { + a <- a.f(actorCtx) + _ <- reply(a) + } yield false + + case Cmd.Inc => + for { + seqNr <- journaller.append(Events.of("a")).flatten + _ <- stateRef.update(_ + 1) + state <- stateRef.get + result <- snapshotter.save(seqNr, state) + seqNr <- journaller + .append(Events.batched(Nel.of("b"), Nel.of("c", "d"))) + .flatten + _ <- result + _ <- stateRef.update(_ + 1) + _ <- reply(seqNr) + } yield false + + case Cmd.Stop => + for { + _ <- reply("stopping") + } yield true + } + } { + for { + _ <- actorCtx.setReceiveTimeout(Duration.Inf) + _ <- receiveTimeout + } yield false } + .contramapM[Envelope[Any]] { envelope => + envelope.msg + .castM[F, Cmd] + .map(a => envelope.copy(msg = a)) + } } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } .pure[Resource[F, *]] EventSourced(EventSourcedId("id"), value = recoveryStarted).pure[F] @@ -240,7 +242,8 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers val recoveryStarted = { val started = RecoveryStarted[S] { (_, _) => Recovering - .const[S] { + .legacy[S] + .const { Replay.empty[F, E].pure[Resource[F, *]] } { startedDeferred @@ -248,7 +251,7 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers .toResource .as(Receive.const[Envelope[C]](false.pure[F])) } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource .make(().pure[F])(_ => stoppedDeferred.complete(()).void) @@ -296,18 +299,19 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay.empty[F, E].pure[Resource[F, *]] - } { recoveringCtx => - val receive = for { - seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten - _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource + } { + case (_, journaller, snapshotter) => + val receive = for { + seqNr <- journaller.append(Events.of(0)).flatten + _ <- snapshotter.save(seqNr, 1).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource @@ -382,20 +386,21 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay.empty[F, E].pure[Resource[F, *]] - } { recoveringCtx => - val receive = for { - seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten - _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten - seqNr <- recoveringCtx.journaller.append(Events.of(1)).flatten - _ <- recoveringCtx.journaller.deleteTo(seqNr).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource + } { + case (_, journaller, snapshotter) => + val receive = for { + seqNr <- journaller.append(Events.of(0)).flatten + _ <- snapshotter.save(seqNr, 1).flatten + seqNr <- journaller.append(Events.of(1)).flatten + _ <- journaller.deleteTo(seqNr).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource @@ -482,17 +487,18 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay.empty[F, E].pure[Resource[F, *]] - } { recoveringCtx => - val receive = for { - _ <- recoveringCtx.journaller.append(Events.batched(Nel.of(0, 1), Nel.of(2))).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource + } { + case (_, journaller, _) => + val receive = for { + _ <- journaller.append(Events.batched(Nel.of(0, 1), Nel.of(2))).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource @@ -566,20 +572,21 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay.empty[F, E].pure[Resource[F, *]] - } { recoveringCtx => - val receive = for { - seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten - _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten - _ <- recoveringCtx.journaller.append(Events.of(1)).flatten - _ <- recoveringCtx.snapshotter.delete(seqNr).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource + } { + case (_, journaller, snapshotter) => + val receive = for { + seqNr <- journaller.append(Events.of(0)).flatten + _ <- snapshotter.save(seqNr, 1).flatten + _ <- journaller.append(Events.of(1)).flatten + _ <- snapshotter.delete(seqNr).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource @@ -670,20 +677,21 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers EventSourcedOf.const { val recoveryStarted = { val started = RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay.empty[F, E].pure[Resource[F, *]] - } { recoveringCtx => - val receive = for { - seqNr <- recoveringCtx.journaller.append(Events.of(0)).flatten - _ <- recoveringCtx.snapshotter.save(seqNr, 1).flatten - _ <- recoveringCtx.journaller.append(Events.of(1)).flatten - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - - receive.toResource + } { + case (_, journaller, snapshotter) => + val receive = for { + seqNr <- journaller.append(Events.of(0)).flatten + _ <- snapshotter.save(seqNr, 1).flatten + _ <- journaller.append(Events.of(1)).flatten + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + + receive.toResource } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } Resource @@ -770,17 +778,18 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers .make(delay productR actorCtx.stop)(_ => stopped.complete(()).void) .as { RecoveryStarted.const { - Recovering[S] - .apply1 { + Recovering + .legacy[S] { Replay .empty[F, E] .pure[Resource[F, *]] - } { _ => - Receive - .const[Envelope[C]](false.pure[F]) - .pure[Resource[F, *]] + } { + case (_, _, _) => + Receive + .const[Envelope[C]](false.pure[F]) + .pure[Resource[F, *]] } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn } } EventSourced(EventSourcedId("10"), value = recoveryStarted).pure[F] @@ -838,15 +847,17 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers Resource .make(lock.get productR actorCtx.stop)(_ => stopped.complete(()).void) .as { - Recovering.const[S] { - Replay - .empty[F, E] - .pure[Resource[F, *]] - } { - Receive - .const[Envelope[C]](false.pure[F]) - .pure[Resource[F, *]] - } + Recovering + .legacy[S] + .const { + Replay + .empty[F, E] + .pure[Resource[F, *]] + } { + Receive + .const[Envelope[C]](false.pure[F]) + .pure[Resource[F, *]] + }: @nowarn } } .pure[Resource[F, *]] @@ -900,7 +911,8 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers RecoveryStarted .const { Recovering - .const[S] { + .legacy[S] + .const { Replay .empty[F, E] .pure[Resource[F, *]] @@ -911,7 +923,7 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers } .pure[Resource[F, *]] } - .pure[Resource[F, *]] + .pure[Resource[F, *]]: @nowarn EventSourced(EventSourcedId("5"), value = recoveryStarted).pure[F] } @@ -967,30 +979,31 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers for { stateRef <- Ref[F].of(true).toResource - } yield Recovering[S].apply1 { + } yield Recovering.legacy[S] { Replay.const[E](stateRef.set(false)).pure[Resource[F, *]] - } { recoveringCtx => - def append: F[Unit] = - for { - state <- stateRef.get - result <- - if (state) { - events - .traverse { event => - recoveringCtx.journaller.append(Events.of(event)) - } - .flatMap(_.foldMapM(_.void)) - } else { - ().pure[F] - } - } yield result + } { + case (_, journaller, _) => + def append: F[Unit] = + for { + state <- stateRef.get + result <- + if (state) { + events + .traverse { event => + journaller.append(Events.of(event)) + } + .flatMap(_.foldMapM(_.void)) + } else { + ().pure[F] + } + } yield result - val receive = for { - _ <- append - _ <- startedDeferred.complete(()) - } yield Receive.const[Envelope[C]](false.pure[F]) - receive.toResource - } + val receive = for { + _ <- append + _ <- startedDeferred.complete(()) + } yield Receive.const[Envelope[C]](false.pure[F]) + receive.toResource + }: @nowarn } Resource .make(().pure[F])(_ => stoppedDeferred.complete(()).void) @@ -1072,19 +1085,21 @@ class PersistentActorOfTest extends AsyncFunSuite with ActorSuite with Matchers } yield RecoveryStarted.const { for { _ <- actorCtx.setReceiveTimeout(10.millis).toResource - } yield Recovering.const[S] { - Replay - .empty[F, E] - .pure[Resource[F, *]] - } { - for { - _ <- actorCtx.setReceiveTimeout(10.millis).toResource - } yield Receive[Envelope[C]] { _ => - false.pure[F] + } yield Recovering + .legacy[S] + .const { + Replay + .empty[F, E] + .pure[Resource[F, *]] } { - timedOut.complete(()).as(true) - } - } + for { + _ <- actorCtx.setReceiveTimeout(10.millis).toResource + } yield Receive[Envelope[C]] { _ => + false.pure[F] + } { + timedOut.complete(()).as(true) + } + }: @nowarn } EventSourced(EventSourcedId("9"), value = recoveryStarted) }