Skip to content

Commit 599920a

Browse files
committed
Fix NullPointerException before resharding (close snowplow#156)
1 parent aa72b16 commit 599920a

File tree

3 files changed

+11
-5
lines changed

3 files changed

+11
-5
lines changed

core/src/main/scala/com.snowplowanalytics.stream/loader/Emitter.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,16 @@ class Emitter(
6969
@throws[IOException]
7070
private def attemptEmit(records: List[EmitterJsonInput]): List[EmitterJsonInput] = {
7171
if (records.isEmpty) {
72-
null
72+
Nil
7373
} else {
7474
val (validRecords: List[EmitterJsonInput], invalidRecords: List[EmitterJsonInput]) =
7575
records.partition(_._2.isValid)
7676
// Send all valid records to stdout / Sink and return those rejected by it
7777
val rejects = goodSink match {
7878
case Some(s) =>
7979
validRecords.foreach {
80-
case (_, record) => record.map(r => s.store(r.json.toString, None, true))
80+
case (_, Validated.Valid(r)) => s.store(r.json.toString, None, true)
81+
case _ => ()
8182
}
8283
Nil
8384
case None if validRecords.isEmpty => Nil
@@ -153,7 +154,10 @@ class Emitter(
153154
/**
154155
* Closes the Sink client when the KinesisConnectorRecordProcessor is shut down
155156
*/
156-
override def shutdown(): Unit = bulkSender.close()
157+
override def shutdown(): Unit = {
158+
println("Shutting down emitter")
159+
bulkSender.close()
160+
}
157161

158162
/**
159163
* Handles records rejected by the JsonTransformer or by Sink

core/src/main/scala/com.snowplowanalytics.stream/loader/clients/BulkSender.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ trait BulkSender[A] {
6262
* Terminate the application in a way the KCL cannot stop, prevents shutdown hooks from running
6363
*/
6464
protected def forceShutdown(): Unit = {
65+
log.info("BulkSender force shutdown")
6566
tracker.foreach { t =>
6667
// TODO: Instead of waiting a fixed time, use synchronous tracking or futures (when the tracker supports futures)
6768
SnowplowTracking.trackApplicationShutdown(t)

elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ class ElasticsearchBulkSender(
9595
}
9696

9797
// do not close the es client, otherwise it will fail when resharding
98-
override def close(): Unit = ()
98+
override def close(): Unit =
99+
log.info("Closing BulkSender")
99100

100101
override def send(records: List[EmitterJsonInput]): List[EmitterJsonInput] = {
101102
val connectionAttemptStartTime = System.currentTimeMillis()
@@ -170,7 +171,7 @@ class ElasticsearchBulkSender(
170171

171172
/** Logs the cluster health */
172173
override def logHealth(): Unit =
173-
client.execute(clusterHealth) onComplete {
174+
client.execute(clusterHealth).onComplete {
174175
case SSuccess(health) =>
175176
health match {
176177
case response =>

0 commit comments

Comments
 (0)