Skip to content

Commit ff1d4bb

Browse files
committed
Improve use of Futures to avoid suspected deadlocks (close #238)
1 parent 36aa914 commit ff1d4bb

File tree

4 files changed

+29
-28
lines changed

4 files changed

+29
-28
lines changed

core/src/main/scala/com.snowplowanalytics.stream/loader/clients/BulkSender.scala

-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ trait BulkSender[A] {
5656

5757
def send(records: List[A]): List[A]
5858
def close(): Unit
59-
def logHealth(): Unit
6059
def chunkConfig(): ESChunk
6160

6261
/**

core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KinesisSink.scala

+9-11
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,14 @@ import java.nio.charset.StandardCharsets.UTF_8
2727
import org.slf4j.LoggerFactory
2828

2929
// Scala
30-
import scala.util.{Failure, Random, Success}
30+
import scala.util.{Failure, Random, Success, Try}
3131

3232
// Amazon
3333
import com.amazonaws.services.kinesis.model._
3434
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
3535
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
3636
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
3737

38-
// Concurrent libraries
39-
import scala.concurrent.Future
40-
import scala.concurrent.ExecutionContext.Implicits.global
41-
4238
import com.snowplowanalytics.stream.loader.Config.Sink.BadSink.{Kinesis => KinesisSinkConfig}
4339

4440
/**
@@ -82,7 +78,7 @@ class KinesisSink(conf: KinesisSinkConfig) extends ISink {
8278
case rnfe: ResourceNotFoundException => false
8379
}
8480

85-
private def put(name: String, data: ByteBuffer, key: String): Future[PutRecordResult] = Future {
81+
private def put(name: String, data: ByteBuffer, key: String): PutRecordResult = {
8682
val putRecordRequest = {
8783
val p = new PutRecordRequest()
8884
p.setStreamName(name)
@@ -102,11 +98,13 @@ class KinesisSink(conf: KinesisSinkConfig) extends ISink {
10298
* @param good Unused parameter which exists to extend ISink
10399
*/
104100
def store(output: String, key: Option[String], good: Boolean): Unit =
105-
put(
106-
conf.streamName,
107-
ByteBuffer.wrap(output.getBytes(UTF_8)),
108-
key.getOrElse(Random.nextInt.toString)
109-
) onComplete {
101+
Try {
102+
put(
103+
conf.streamName,
104+
ByteBuffer.wrap(output.getBytes(UTF_8)),
105+
key.getOrElse(Random.nextInt.toString)
106+
)
107+
} match {
110108
case Success(result) =>
111109
log.info("Writing successful")
112110
log.info(s" + ShardId: ${result.getShardId}")

core/src/test/scala/com.snowplowanalytics.stream.loader/EmitterSpec.scala

-2
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ case class MockElasticsearchSender(chunkConf: ESChunk) extends BulkSender[Emitte
5151
List.empty
5252
}
5353
override def close() = {}
54-
override def logHealth(): Unit = ()
5554
override def chunkConfig(): ESChunk = chunkConf
5655
override val tracker = None
5756
override val maxConnectionWaitTimeMs: Long = 1000L
@@ -67,7 +66,6 @@ class EmitterSpec extends Specification {
6766
val fakeSender: BulkSender[EmitterJsonInput] = new BulkSender[EmitterJsonInput] {
6867
override def send(records: List[EmitterJsonInput]): List[EmitterJsonInput] = List.empty
6968
override def close(): Unit = ()
70-
override def logHealth(): Unit = ()
7169
override def chunkConfig(): ESChunk = ESChunk(1L, 1L)
7270
override val tracker = None
7371
override val log: Logger = null

elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala

+20-14
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory
2121
// Scala
2222
import scala.concurrent.ExecutionContext.Implicits.global
2323
import scala.concurrent.Future
24-
import scala.util.{Failure => SFailure, Success => SSuccess}
2524

2625
import org.elasticsearch.client.RestClient
2726

@@ -40,7 +39,7 @@ import org.apache.http.message.BasicHeader
4039
import cats.Id
4140
import cats.effect.{IO, Timer}
4241
import cats.data.Validated
43-
import cats.syntax.validated._
42+
import cats.implicits._
4443

4544
import retry.implicits._
4645
import retry.{RetryDetails, RetryPolicy}
@@ -142,6 +141,15 @@ class ElasticsearchBulkSender(
142141
onErrorHandler
143142
)
144143
.map(extractResult(records))
144+
.flatTap { failures =>
145+
IO.delay(log.info(s"Emitted ${esObjects.size - failures.size} records to Elasticsearch"))
146+
}
147+
.flatTap { failures =>
148+
if (failures.nonEmpty)
149+
BulkSender.futureToTask(logHealth())
150+
else
151+
IO.unit
152+
}
145153
.attempt
146154
.unsafeRunSync() match {
147155
case Right(s) => s
@@ -156,9 +164,6 @@ class ElasticsearchBulkSender(
156164
}
157165
} else Nil
158166

159-
log.info(s"Emitted ${esObjects.size - newFailures.size} records to Elasticsearch")
160-
if (newFailures.nonEmpty) logHealth()
161-
162167
val allFailures = oldFailures ++ newFailures
163168

164169
if (allFailures.nonEmpty) log.warn(s"Returning ${allFailures.size} records as failed")
@@ -195,20 +200,21 @@ class ElasticsearchBulkSender(
195200
}
196201

197202
/** Logs the cluster health */
198-
override def logHealth(): Unit =
203+
def logHealth(): Future[Unit] =
199204
client
200205
.execute(clusterHealth)
201206
.flatMap { health =>
202207
health.fold(failure => Future.failed(failure.error.asException), Future.successful(_))
203208
}
204-
.onComplete {
205-
case SSuccess(result) =>
206-
result.status match {
207-
case "green" => log.info("Cluster health is green")
208-
case "yellow" => log.warn("Cluster health is yellow")
209-
case "red" => log.error("Cluster health is red")
210-
}
211-
case SFailure(e) => log.error("Couldn't retrieve cluster health", e)
209+
.map { result =>
210+
result.status match {
211+
case "green" => log.info("Cluster health is green")
212+
case "yellow" => log.warn("Cluster health is yellow")
213+
case "red" => log.error("Cluster health is red")
214+
}
215+
}
216+
.recover { case t: Throwable =>
217+
log.error("Couldn't retrieve cluster health", t)
212218
}
213219

214220
/**

0 commit comments

Comments
 (0)