Skip to content

Commit b5b4d8d

Browse files
committed
amendment 1: throttle acks per second
1 parent 5593fd6 commit b5b4d8d

File tree

3 files changed

+16
-7
lines changed

3 files changed

+16
-7
lines changed

modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Completion.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ import java.net.URI
1111

1212
import cats.implicits._
1313

14-
import cats.effect.Sync
14+
import cats.effect.implicits._
15+
import cats.effect.{Async, Sync}
1516

1617
import io.circe.syntax.EncoderOps
1718

@@ -56,7 +57,7 @@ object Completion {
5657
* @param state
5758
* all metadata shredder extracted from a batch
5859
*/
59-
def seal[F[_]: Sync, C: Checkpointer[F, *]](
60+
def seal[F[_]: Async, C: Checkpointer[F, *]](
6061
blobStorage: BlobStorage[F],
6162
compression: Compression,
6263
getTypes: Set[Data.ShreddedType] => TypesInfo,
@@ -84,8 +85,9 @@ object Completion {
8485
)
8586
body = message.selfDescribingData(legacyMessageFormat).asJson.noSpaces
8687
_ <- writeFile(blobStorage, shreddingCompletePath, body)
87-
_ <- Checkpointer[F, C].checkpoint(state.checkpointer)
88+
fiber <- Checkpointer[F, C].checkpoint(state.checkpointer).start
8889
_ <- producer.send(body)
90+
_ <- fiber.join
8991
} yield ()
9092

9193
def writeFile[F[_]: Sync](

modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ object Processing {
104104
.through(windowing)
105105

106106
val sink: Pipe[F, Record[Window, List[(SinkPath, Transformed.Data)], State[C]], Unit] =
107-
_.through(getSink(resources, config.output, config.formats))
107+
_.through(getSink(resources, config.output, config.formats)).prefetch
108108
.evalMap(onComplete)
109109

110110
Shutdown

modules/transformer-pubsub/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/PubsubCheckpointer.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,28 @@
77
*/
88
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.pubsub
99

10-
import cats.Applicative
10+
import cats.effect.Async
1111
import cats.implicits._
1212

1313
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.Queue
1414
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.Checkpointer
1515

16+
import scala.concurrent.duration.DurationLong
17+
1618
case class PubsubCheckpointer[F[_]](acks: List[F[Unit]])
1719

1820
object PubsubCheckpointer {
1921

22+
val acksPerSecond = 1000 // TODO: Find a neat way to make this configurable
23+
2024
def checkpointer[F[_]](message: Queue.Consumer.Message[F]): PubsubCheckpointer[F] =
2125
PubsubCheckpointer[F](List(message.ack))
2226

23-
implicit def pubsubCheckPointer[F[_]: Applicative]: Checkpointer[F, PubsubCheckpointer[F]] = new Checkpointer[F, PubsubCheckpointer[F]] {
24-
override def checkpoint(c: PubsubCheckpointer[F]): F[Unit] = c.acks.sequence_
27+
implicit def pubsubCheckpointer[F[_]: Async]: Checkpointer[F, PubsubCheckpointer[F]] = new Checkpointer[F, PubsubCheckpointer[F]] {
28+
override def checkpoint(c: PubsubCheckpointer[F]): F[Unit] =
29+
c.acks.grouped(acksPerSecond).toList.traverse_ { acks =>
30+
acks.sequence_ *> Async[F].sleep(1.second)
31+
}
2532

2633
override def combine(x: PubsubCheckpointer[F], y: PubsubCheckpointer[F]): PubsubCheckpointer[F] =
2734
PubsubCheckpointer[F](x.acks ::: y.acks)

0 commit comments

Comments
 (0)