Skip to content

Commit

Permalink
amendment 1: throttle acks per second
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 30, 2023
1 parent 5593fd6 commit 2375e64
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import java.net.URI

import cats.implicits._

import cats.effect.Sync
import cats.effect.implicits._
import cats.effect.{Async, Sync}

import io.circe.syntax.EncoderOps

Expand Down Expand Up @@ -56,7 +57,7 @@ object Completion {
* @param state
* all metadata shredder extracted from a batch
*/
def seal[F[_]: Sync, C: Checkpointer[F, *]](
def seal[F[_]: Async, C: Checkpointer[F, *]](
blobStorage: BlobStorage[F],
compression: Compression,
getTypes: Set[Data.ShreddedType] => TypesInfo,
Expand Down Expand Up @@ -84,8 +85,9 @@ object Completion {
)
body = message.selfDescribingData(legacyMessageFormat).asJson.noSpaces
_ <- writeFile(blobStorage, shreddingCompletePath, body)
_ <- Checkpointer[F, C].checkpoint(state.checkpointer)
fiber <- Checkpointer[F, C].checkpoint(state.checkpointer).start
_ <- producer.send(body)
_ <- fiber.join
} yield ()

def writeFile[F[_]: Sync](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ object Processing {
.through(windowing)

val sink: Pipe[F, Record[Window, List[(SinkPath, Transformed.Data)], State[C]], Unit] =
_.through(getSink(resources, config.output, config.formats))
_.through(getSink(resources, config.output, config.formats)).prefetch
.evalMap(onComplete)

Shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,28 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.pubsub

import cats.Applicative
import cats.effect.Async
import cats.implicits._

import com.snowplowanalytics.snowplow.rdbloader.common.cloud.Queue
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.Checkpointer

import scala.concurrent.duration.DurationLong

case class PubsubCheckpointer[F[_]](acks: List[F[Unit]])

object PubsubCheckpointer {

val acksPer100Millis = 100 // TODO: Find a neat way to make this configurable

def checkpointer[F[_]](message: Queue.Consumer.Message[F]): PubsubCheckpointer[F] =
PubsubCheckpointer[F](List(message.ack))

implicit def pubsubCheckPointer[F[_]: Applicative]: Checkpointer[F, PubsubCheckpointer[F]] = new Checkpointer[F, PubsubCheckpointer[F]] {
override def checkpoint(c: PubsubCheckpointer[F]): F[Unit] = c.acks.sequence_
implicit def pubsubCheckpointer[F[_]: Async]: Checkpointer[F, PubsubCheckpointer[F]] = new Checkpointer[F, PubsubCheckpointer[F]] {
override def checkpoint(c: PubsubCheckpointer[F]): F[Unit] =
c.acks.grouped(acksPer100Millis).toList.traverse_ { acks =>
acks.sequence_ *> Async[F].sleep(100.millis)
}

override def combine(x: PubsubCheckpointer[F], y: PubsubCheckpointer[F]): PubsubCheckpointer[F] =
PubsubCheckpointer[F](x.acks ::: y.acks)
Expand Down

0 comments on commit 2375e64

Please sign in to comment.