Skip to content

Commit

Permalink
Transformer Kinesis: recover from IllegalArgumentException when check…
Browse files Browse the repository at this point in the history
…pointing near end of shard (close #1088)
  • Loading branch information
spenes committed Oct 17, 2022
1 parent f63fa3d commit e6ebe3c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ import com.snowplowanalytics.snowplow.rdbloader.common.config.Region

object Kinesis {

case class Message[F[_]](content: String, ack: F[Unit], shardId: String) extends Queue.Consumer.Message[F]
case class Message[F[_]: Sync](record: CommittableRecord) extends Queue.Consumer.Message[F] {
override def content: String = getContent(record)
override def ack: F[Unit] = record.checkpoint
}

def consumer[F[_] : ConcurrentEffect : ContextShift : Timer : Applicative](blocker: Blocker,
appName: String,
Expand Down Expand Up @@ -91,7 +94,7 @@ object Kinesis {
consumer = new Queue.Consumer[F] {
override def read: Stream[F, Consumer.Message[F]] =
kinesis.readFromKinesisStream(consumerSettings)
.map(r => Message(getContent(r), r.checkpoint, r.shardId))
.map(r => Message(r))
}
} yield consumer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,37 @@ object KinesisCheckpointer {

def checkpointer[F[_] : Sync](message: Queue.Consumer.Message[F]): KinesisCheckpointer[F] =
message match {
case m: Kinesis.Message[F] => KinesisCheckpointer[F](Map(m.shardId -> safelyCheckpoint(m)))
case m: Kinesis.Message[F] => KinesisCheckpointer[F](Map(m.record.shardId -> safelyCheckpoint(m)))
case _ => Checkpointer[F, KinesisCheckpointer[F]].empty
}

private def safelyCheckpoint[F[_] : Sync](message: Kinesis.Message[F]): F[Unit] =
message.ack.recoverWith {
// The ShardRecordProcessor instance has been shutdown. This just means another KCL worker
// has stolen our lease. It is expected during autoscaling of instances, and is safe to
// ignore.
case _: ShutdownException =>
Logger[F].warn(s"Skipping checkpointing of shard ${message.shardId} because this worker no longer owns the lease")
// The ShardRecordProcessor instance has been shutdown. This just means another KCL worker
// has stolen our lease. It is expected during autoscaling of instances, and is safe to
// ignore.
Logger[F].warn(s"Skipping checkpointing of shard ${message.record.shardId} because this worker no longer owns the lease")

case _: IllegalArgumentException if message.record.isLastInShard =>
// Copied from enrich
// See https://github.com/snowplow/enrich/issues/657 and https://github.com/snowplow/snowplow-rdb-loader/issues/1088
// This can happen at the shard end when KCL no longer allows checkpointing of the last record in the shard.
// We need to release the semaphore, so that fs2-aws handles checkpointing the end of the shard.
Logger[F].warn(
s"Checkpointing failed on last record in shard. Ignoring error and instead try checkpointing of the shard end"
) *>
Sync[F].delay(message.record.lastRecordSemaphore.release())

case _: IllegalArgumentException if message.record.lastRecordSemaphore.availablePermits === 0 =>
// Copied from enrich
// See https://github.com/snowplow/enrich/issues/657 and https://github.com/snowplow/snowplow-rdb-loader/issues/1088
// This can happen near the shard end, e.g. the penultimate batch in the shard, when KCL has already enqueued the final record in the shard to the fs2 queue.
// We must not release the semaphore yet, because we are not ready for fs2-aws to checkpoint the end of the shard.
// We can safely ignore the exception and move on.
Logger[F].warn(
s"Checkpointing failed on a record which was not the last in the shard. Meanwhile, KCL has already enqueued the final record in the shard to the fs2 queue. Ignoring error and instead continue processing towards the shard end"
)
}

implicit def kinesisCheckpointer[F[_] : Applicative]: Checkpointer[F, KinesisCheckpointer[F]] = new Checkpointer[F, KinesisCheckpointer[F]] {
Expand Down

0 comments on commit e6ebe3c

Please sign in to comment.