Skip to content

Commit 003f24b

Browse files
committed
Merge branch 'release/0.12.1'
2 parents aa72b16 + 1d6bbd9 commit 003f24b

File tree

6 files changed

+18
-8
lines changed

6 files changed

+18
-8
lines changed

CHANGELOG

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
Version 0.12.1 (2020-05-13)
2+
---------------------------
3+
Fix NullPointerException before resharding (#156)
4+
15
Version 0.12.0 (2020-03-17)
26
---------------------------
37
Add badges to README (#73)

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ $ sbt compile
2525
The Snowplow Elasticsearch Loader has the following command-line interface:
2626

2727
```
28-
snowplow-elasticsearch-loader 0.12.0
28+
snowplow-elasticsearch-loader 0.12.1
2929
3030
Usage: snowplow-elasticsearch-loader [options]
3131
@@ -52,7 +52,7 @@ aws {
5252
Next, start the loader, making sure to specify your new config file:
5353

5454
```bash
55-
$ java -jar snowplow-elasticsearch-loader-http-0.12.0.jar --config my.conf
55+
$ java -jar snowplow-elasticsearch-loader-http-0.12.1.jar --config my.conf
5656
```
5757

5858
## Find out more

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ lazy val commonDependencies = Seq(
4040
lazy val buildSettings = Seq(
4141
organization := "com.snowplowanalytics",
4242
name := "snowplow-elasticsearch-loader",
43-
version := "0.12.0",
43+
version := "0.12.1",
4444
description := "Load the contents of a Kinesis stream or NSQ topic to Elasticsearch",
4545
scalaVersion := "2.12.10",
4646
scalacOptions := BuildSettings.compilerOptions,

core/src/main/scala/com.snowplowanalytics.stream/loader/Emitter.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,16 @@ class Emitter(
6969
@throws[IOException]
7070
private def attemptEmit(records: List[EmitterJsonInput]): List[EmitterJsonInput] = {
7171
if (records.isEmpty) {
72-
null
72+
Nil
7373
} else {
7474
val (validRecords: List[EmitterJsonInput], invalidRecords: List[EmitterJsonInput]) =
7575
records.partition(_._2.isValid)
7676
// Send all valid records to stdout / Sink and return those rejected by it
7777
val rejects = goodSink match {
7878
case Some(s) =>
7979
validRecords.foreach {
80-
case (_, record) => record.map(r => s.store(r.json.toString, None, true))
80+
case (_, Validated.Valid(r)) => s.store(r.json.toString, None, true)
81+
case _ => ()
8182
}
8283
Nil
8384
case None if validRecords.isEmpty => Nil
@@ -153,7 +154,10 @@ class Emitter(
153154
/**
154155
* Closes the Sink client when the KinesisConnectorRecordProcessor is shut down
155156
*/
156-
override def shutdown(): Unit = bulkSender.close()
157+
override def shutdown(): Unit = {
158+
println("Shutting down emitter")
159+
bulkSender.close()
160+
}
157161

158162
/**
159163
* Handles records rejected by the JsonTransformer or by Sink

core/src/main/scala/com.snowplowanalytics.stream/loader/clients/BulkSender.scala

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ trait BulkSender[A] {
6262
* Terminate the application in a way the KCL cannot stop, prevents shutdown hooks from running
6363
*/
6464
protected def forceShutdown(): Unit = {
65+
log.info("BulkSender force shutdown")
6566
tracker.foreach { t =>
6667
// TODO: Instead of waiting a fixed time, use synchronous tracking or futures (when the tracker supports futures)
6768
SnowplowTracking.trackApplicationShutdown(t)

elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ class ElasticsearchBulkSender(
9595
}
9696

9797
// do not close the es client, otherwise it will fail when resharding
98-
override def close(): Unit = ()
98+
override def close(): Unit =
99+
log.info("Closing BulkSender")
99100

100101
override def send(records: List[EmitterJsonInput]): List[EmitterJsonInput] = {
101102
val connectionAttemptStartTime = System.currentTimeMillis()
@@ -170,7 +171,7 @@ class ElasticsearchBulkSender(
170171

171172
/** Logs the cluster health */
172173
override def logHealth(): Unit =
173-
client.execute(clusterHealth) onComplete {
174+
client.execute(clusterHealth).onComplete {
174175
case SSuccess(health) =>
175176
health match {
176177
case response =>

0 commit comments

Comments
 (0)