From 6f61cbed9d6d9798c3619f77f0667a71d1bf9c24 Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Thu, 28 Nov 2024 10:32:13 -0500 Subject: [PATCH] fix: don't set idle when backtracking, check for forward behind latest backtracking (#104) * fix: don't set idle when backtracking, check for forward behind latest backtracking * use separate idle counter * debug log * 2 idleCountBeforeHeartbeat --------- Co-authored-by: Patrik Nordwall --- .../dynamodb/internal/BySliceQuery.scala | 21 +++++++++++++------ .../query/scaladsl/DynamoDBReadJournal.scala | 14 ++++++++++++- .../EventsBySlicePubSubBacktrackingSpec.scala | 2 +- 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala index 73bfc8b..de83618 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala @@ -42,7 +42,8 @@ import org.slf4j.Logger startTimestamp = Instant.EPOCH, startWallClock = Instant.EPOCH, currentQueryWallClock = Instant.EPOCH, - previousQueryWallClock = Instant.EPOCH) + previousQueryWallClock = Instant.EPOCH, + idleCountBeforeHeartbeat = 0) } final case class QueryState( @@ -60,7 +61,8 @@ import org.slf4j.Logger startTimestamp: Instant, startWallClock: Instant, currentQueryWallClock: Instant, - previousQueryWallClock: Instant) { + previousQueryWallClock: Instant, + idleCountBeforeHeartbeat: Long) { def backtracking: Boolean = backtrackingCount > 0 @@ -301,6 +303,10 @@ import org.slf4j.Logger def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = { val newIdleCount = if (state.itemCount == 0) state.idleCount + 1 else 0 + val newIdleCountBeforeHeartbeat = + if (state.backtracking) state.idleCountBeforeHeartbeat + else if (state.itemCount == 0) state.idleCountBeforeHeartbeat + 1 + else 0 // start tracking query wall clock for heartbeats after initial backtracking query val newQueryWallClock = if (state.latestBacktracking != TimestampOffset.Zero) clock.instant() @@ -324,7 +330,8 @@ import org.slf4j.Logger latestBacktracking = fromOffset, backtrackingExpectFiltered = state.latestBacktrackingSeenCount, currentQueryWallClock = newQueryWallClock, - previousQueryWallClock = state.currentQueryWallClock) + previousQueryWallClock = state.currentQueryWallClock, + idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat) } else if (switchFromBacktracking(state)) { // switching from backtracking state.copy( @@ -334,7 +341,8 @@ import org.slf4j.Logger idleCount = newIdleCount, backtrackingCount = 0, currentQueryWallClock = newQueryWallClock, - previousQueryWallClock = state.currentQueryWallClock) + previousQueryWallClock = state.currentQueryWallClock, + idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat) } else { // continuing val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0 @@ -346,7 +354,8 @@ import org.slf4j.Logger backtrackingCount = newBacktrackingCount, backtrackingExpectFiltered = state.latestBacktrackingSeenCount, currentQueryWallClock = newQueryWallClock, - previousQueryWallClock = state.currentQueryWallClock) + previousQueryWallClock = state.currentQueryWallClock, + idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat) } val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow) @@ -399,7 +408,7 @@ import org.slf4j.Logger } def heartbeat(state: QueryState): Option[Envelope] = { - if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) { + if (state.idleCountBeforeHeartbeat >= 2 && state.previousQueryWallClock != Instant.EPOCH) { // use wall clock to measure duration since start, up to idle backtracking limit val timestamp = state.startTimestamp.plus( JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime))) diff --git a/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala b/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala index a5d1072..fd95303 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala @@ -572,12 +572,24 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg case None => Instant.EPOCH } env => { + val slice = persistenceExt.sliceForPersistenceId(env.persistenceId) env.offset match { case t: TimestampOffset => if (EnvelopeOrigin.fromQuery(env)) { + if (log.isDebugEnabled()) { + val l = latestBacktracking(slice) + if (l.isAfter(t.timestamp)) + log.debug( + "event from query for persistenceId [{}] seqNr [{}] " + + s"timestamp [{}] was before last event from backtracking or heartbeat [{}].", + env.persistenceId, + env.sequenceNr, + t.timestamp, + l) + } + env :: Nil } else { - val slice = persistenceExt.sliceForPersistenceId(env.persistenceId) if (EnvelopeOrigin.fromBacktracking(env)) { latestBacktrackingPerSlice = latestBacktrackingPerSlice.updated(slice, t.timestamp) env :: Nil diff --git a/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySlicePubSubBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySlicePubSubBacktrackingSpec.scala index 5c25dca..672eed0 100644 --- a/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySlicePubSubBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/dynamodb/query/EventsBySlicePubSubBacktrackingSpec.scala @@ -32,7 +32,7 @@ object EventsBySlicePubSubBacktrackingSpec { akka.persistence.dynamodb { journal.publish-events = on query { - refresh-interval = 1 s + refresh-interval = 300 ms # Ensure pubsub arrives first behind-current-time = 2s