Skip to content

Commit

Permalink
feat: updateTimestampOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 19, 2023
1 parent 0390b64 commit f2813ed
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/

package akka.projection
import java.time.Instant

import akka.actor.typed.scaladsl.LoggerOps
import scala.util.Failure
import scala.util.Success
Expand All @@ -19,6 +21,7 @@ import akka.actor.typed.scaladsl.StashBuffer
import akka.annotation.InternalApi
import akka.projection.internal.ManagementState
import akka.projection.scaladsl.ProjectionManagement
import akka.projection.scaladsl.ProjectionManagement.UpdateTimestampOffset

object ProjectionBehavior {

Expand Down Expand Up @@ -48,6 +51,12 @@ object ProjectionBehavior {
extends ProjectionManagementCommand
final case class SetOffsetResult[Offset](replyTo: ActorRef[Done]) extends ProjectionManagementCommand

final case class SetTimestampOffset(
projectionId: ProjectionId,
updates: Set[UpdateTimestampOffset],
replyTo: ActorRef[Done])
extends ProjectionManagementCommand

final case class IsPaused(projectionId: ProjectionId, replyTo: ActorRef[Boolean])
extends ProjectionManagementCommand
final case class SetPaused(projectionId: ProjectionId, paused: Boolean, replyTo: ActorRef[Done])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
package akka.projection.scaladsl

import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration

import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
Expand All @@ -20,14 +22,18 @@ import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
import akka.util.JavaDurationConverters._
import akka.util.Timeout

import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import java.time.Instant

import akka.projection.scaladsl.ProjectionManagement.UpdateTimestampOffset

object ProjectionManagement extends ExtensionId[ProjectionManagement] {
def createExtension(system: ActorSystem[_]): ProjectionManagement = new ProjectionManagement(system)

def get(system: ActorSystem[_]): ProjectionManagement = apply(system)

final case class UpdateTimestampOffset(persistenceId: String, seqNr: Long, timestamp: Instant)
}

class ProjectionManagement(system: ActorSystem[_]) extends Extension {
Expand Down Expand Up @@ -100,6 +106,24 @@ class ProjectionManagement(system: ActorSystem[_]) extends Extension {
retry(() => askSetOffset())
}

/**
* Update the stored `TimestampOffset` for the `projectionId` and restart the `Projection`.
* This can be useful if the projection was stuck with errors on a specific offset and should skip
* that offset and continue with next.
*
* Another use case is to populate the offset store with know starting points when enabling change events
* for Durable State. In that case the offset sequence number should be set to current Durable State
* revision minus 1.
*/
def updateTimestampOffset(projectionId: ProjectionId, updates: Set[UpdateTimestampOffset]): Future[Done] = {
def askSetTimestampOffset(): Future[Done] = {
topic(projectionId.name)
.ask(replyTo => Topic.Publish(SetTimestampOffset(projectionId, updates, replyTo)))
}

retry(() => askSetTimestampOffset())
}

private def retry[T](operation: () => Future[T]): Future[T] = {
def attempt(remaining: Int): Future[T] = {
operation().recoverWith {
Expand Down

0 comments on commit f2813ed

Please sign in to comment.