Skip to content

Commit 36aa914

Browse files
committed
Fix app from exiting on elasticsearch error responses (close #233)
1 parent 5935cc4 commit 36aa914

File tree

3 files changed

+50
-48
lines changed

3 files changed

+50
-48
lines changed

core/src/main/scala/com.snowplowanalytics.stream/loader/clients/BulkSender.scala

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,13 @@ import org.slf4j.Logger
2727
// Scala
2828
import scala.concurrent.Future
2929
import scala.concurrent.duration._
30-
import scala.util.Random
3130

3231
// cats
3332
import cats.effect.{ContextShift, IO}
3433
import scala.concurrent.ExecutionContext
3534
import cats.{Applicative, Id}
3635

37-
import retry.{PolicyDecision, RetryDetails, RetryPolicy}
36+
import retry.{RetryDetails, RetryPolicies, RetryPolicy}
3837

3938
// Snowplow
4039
import com.snowplowanalytics.snowplow.scalatracker.Tracker
@@ -81,28 +80,25 @@ object BulkSender {
8180
def delayPolicy[M[_]: Applicative](
8281
maxAttempts: Int,
8382
maxConnectionWaitTimeMs: Long
84-
): RetryPolicy[M] =
85-
RetryPolicy.lift { status =>
86-
if (status.retriesSoFar >= maxAttempts) PolicyDecision.GiveUp
87-
else {
88-
val maxDelay = 2.milliseconds * Math.pow(2, status.retriesSoFar.toDouble).toLong
89-
val randomDelayNanos = (maxDelay.toNanos * Random.nextDouble()).toLong
90-
val maxConnectionWaitTimeNano = maxConnectionWaitTimeMs * 1000
91-
val delayNanos =
92-
if (maxConnectionWaitTimeMs >= randomDelayNanos) maxConnectionWaitTimeNano
93-
else randomDelayNanos
94-
PolicyDecision.DelayAndRetry(new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS))
95-
}
96-
}
83+
): RetryPolicy[M] = {
84+
val basePolicy =
85+
RetryPolicies
86+
.fullJitter(20.milliseconds)
87+
.join(RetryPolicies.limitRetries(maxAttempts))
88+
RetryPolicies.limitRetriesByCumulativeDelay(
89+
FiniteDuration(maxConnectionWaitTimeMs, TimeUnit.MILLISECONDS),
90+
basePolicy
91+
)
92+
}
9793

9894
def onError(log: Logger, tracker: Option[Tracker[Id]], connectionAttemptStartTime: Long)(
9995
error: Throwable,
10096
details: RetryDetails
10197
): IO[Unit] = {
102-
val duration = (error, details) match {
103-
case (error, RetryDetails.GivingUp(_, totalDelay)) =>
98+
val duration = details match {
99+
case RetryDetails.GivingUp(_, totalDelay) =>
104100
IO(log.error("Storage threw an unexpected exception. Giving up ", error)).as(totalDelay)
105-
case (error, RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay)) =>
101+
case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) =>
106102
IO(
107103
log.error(
108104
s"Storage threw an unexpected exception, after $retriesSoFar retries. Next attempt in $nextDelay ",
@@ -127,12 +123,6 @@ object BulkSender {
127123
}
128124
}
129125

130-
/** Predicate about whether or not we should retry sending stuff to ES */
131-
def exPredicate: Throwable => Boolean = {
132-
case _: Exception => true
133-
case _ => false
134-
}
135-
136126
def futureToTask[T](f: => Future[T]): IO[T] =
137127
IO.fromFuture(IO.delay(f))
138128

elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory
2020

2121
// Scala
2222
import scala.concurrent.ExecutionContext.Implicits.global
23+
import scala.concurrent.Future
2324
import scala.util.{Failure => SFailure, Success => SSuccess}
2425

2526
import org.elasticsearch.client.RestClient
@@ -43,7 +44,7 @@ import cats.syntax.validated._
4344

4445
import retry.implicits._
4546
import retry.{RetryDetails, RetryPolicy}
46-
import retry.CatsEffect._
47+
import retry._
4748

4849
import com.snowplowanalytics.snowplow.scalatracker.Tracker
4950

@@ -116,9 +117,11 @@ class ElasticsearchBulkSender(
116117

117118
override def send(records: List[EmitterJsonInput]): List[EmitterJsonInput] = {
118119
val connectionAttemptStartTime = System.currentTimeMillis()
119-
implicit def onErrorHandler: (Throwable, RetryDetails) => IO[Unit] =
120+
val onErrorHandler: (Throwable, RetryDetails) => IO[Unit] =
120121
BulkSender.onError(log, tracker, connectionAttemptStartTime)
121-
implicit def retryPolicy: RetryPolicy[IO] =
122+
def onFailureHandler[A](res: Response[A], rd: RetryDetails): IO[Unit] =
123+
onErrorHandler(res.error.asException, rd)
124+
val retryPolicy: RetryPolicy[IO] =
122125
BulkSender.delayPolicy[IO](maxAttempts, maxConnectionWaitTimeMs)
123126

124127
// oldFailures - failed at the transformation step
@@ -132,7 +135,12 @@ class ElasticsearchBulkSender(
132135
val newFailures: List[EmitterJsonInput] = if (actions.nonEmpty) {
133136
BulkSender
134137
.futureToTask(client.execute(bulk(actions)))
135-
.retryingOnSomeErrors(BulkSender.exPredicate)
138+
.retryingOnFailuresAndAllErrors(
139+
r => r.isSuccess,
140+
retryPolicy,
141+
onFailureHandler,
142+
onErrorHandler
143+
)
136144
.map(extractResult(records))
137145
.attempt
138146
.unsafeRunSync() match {
@@ -168,12 +176,14 @@ class ElasticsearchBulkSender(
168176
def extractResult(
169177
records: List[EmitterJsonInput]
170178
)(response: Response[BulkResponse]): List[EmitterJsonInput] =
171-
response.result.items
172-
.zip(records)
173-
.flatMap { case (bulkResponseItem, record) =>
174-
handleResponse(bulkResponseItem.error.map(_.reason), record)
175-
}
176-
.toList
179+
response.fold(records) { result =>
180+
result.items
181+
.zip(records)
182+
.flatMap { case (bulkResponseItem, record) =>
183+
handleResponse(bulkResponseItem.error.map(_.reason), record)
184+
}
185+
.toList
186+
}
177187

178188
def composeObject(jsonRecord: JsonRecord): ElasticsearchObject = {
179189
val index = jsonRecord.shard match {
@@ -186,18 +196,20 @@ class ElasticsearchBulkSender(
186196

187197
/** Logs the cluster health */
188198
override def logHealth(): Unit =
189-
client.execute(clusterHealth).onComplete {
190-
case SSuccess(health) =>
191-
health match {
192-
case response =>
193-
response.result.status match {
194-
case "green" => log.info("Cluster health is green")
195-
case "yellow" => log.warn("Cluster health is yellow")
196-
case "red" => log.error("Cluster health is red")
197-
}
198-
}
199-
case SFailure(e) => log.error("Couldn't retrieve cluster health", e)
200-
}
199+
client
200+
.execute(clusterHealth)
201+
.flatMap { health =>
202+
health.fold(failure => Future.failed(failure.error.asException), Future.successful(_))
203+
}
204+
.onComplete {
205+
case SSuccess(result) =>
206+
result.status match {
207+
case "green" => log.info("Cluster health is green")
208+
case "yellow" => log.warn("Cluster health is yellow")
209+
case "red" => log.error("Cluster health is red")
210+
}
211+
case SFailure(e) => log.error("Couldn't retrieve cluster health", e)
212+
}
201213

202214
/**
203215
* Handle the response given for a bulk request, by producing a failure if we failed to insert

project/Dependencies.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ object Dependencies {
2525
val netty = "4.1.67.Final" // Override provided version to fix security vulnerability
2626
val jackson = "2.12.7"
2727
// Scala
28-
val catsRetry = "0.3.2"
28+
val catsRetry = "2.1.1"
2929
val circe = "0.14.1"
3030
val decline = "2.1.0"
3131
val snowplowTracker = "1.0.0"
@@ -51,7 +51,7 @@ object Dependencies {
5151
val netty = "io.netty" % "netty-all" % V.netty
5252
val jacksonCbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.jackson // Override provided version to fix security vulnerability
5353
// Scala
54-
val catsRetry = "com.github.cb372" %% "cats-retry-cats-effect" % V.catsRetry
54+
val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry
5555
val circeOptics = "io.circe" %% "circe-optics" % V.circe
5656
val decline = "com.monovore" %% "decline" % V.decline
5757
val jacksonScala = "com.fasterxml.jackson.module" %% "jackson-module-scala" % V.jackson // Compatible version required for elastic4s

0 commit comments

Comments
 (0)