Skip to content

Commit 604e31f

Browse files
committed
Use PutRecords to batch up failed inserts sent to kinesis (close #239)
1 parent ff1d4bb commit 604e31f

File tree

11 files changed

+244
-58
lines changed

11 files changed

+244
-58
lines changed

config/config.kinesis.reference.hocon

+9
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,15 @@
139139
# Optional endpoint url configuration to override aws kinesis endpoints,
140140
# this can be used to specify local endpoints when using localstack
141141
"customEndpoint": "127.0.0.1:7846"
142+
143+
# Optional. Limits the number of events in a single PutRecords request.
144+
# Maximum allowed: 500
145+
"recordLimit": 500
146+
147+
# Optional. Limits the number of bytes in a single PutRecords request,
148+
# including records and partition keys.
149+
# Maximum allowed: 5 MB
150+
"byteLimit": 5242880
142151
}
143152
}
144153

core/src/main/resources/application.conf

+5-1
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,14 @@
2525
"signing": false
2626
}
2727
}
28+
"bad": {
29+
"recordLimit": 500
30+
"byteLimit": 5242880
31+
}
2832
}
2933
"monitoring": {
3034
"metrics": {
3135
"cloudWatch": true
3236
}
3337
}
34-
}
38+
}

core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,9 @@ object Config {
147147
final case class Kinesis(
148148
streamName: String,
149149
region: Region,
150-
customEndpoint: Option[String]
150+
customEndpoint: Option[String],
151+
recordLimit: Int,
152+
byteLimit: Int
151153
) extends BadSink
152154
}
153155
}

core/src/main/scala/com.snowplowanalytics.stream/loader/Emitter.scala

+6-8
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ class Emitter(
7171
// Send all valid records to stdout / Sink and return those rejected by it
7272
val rejects = goodSink match {
7373
case Left(s) =>
74-
validRecords.foreach {
75-
case (_, Validated.Valid(r)) => s.store(r.json.toString, None, true)
76-
case _ => ()
74+
val validStrings = validRecords.collect { case (_, Validated.Valid(r)) =>
75+
r.json.toString
7776
}
77+
s.store(validStrings, true)
7878
Nil
7979
case Right(_) if validRecords.isEmpty => Nil
8080
case Right(sender) => emit(validRecords, sender)
@@ -164,11 +164,9 @@ class Emitter(
164164
* @param records List of failed records
165165
*/
166166
override def fail(records: JList[EmitterJsonInput]): Unit = {
167-
records.asScala.foreach {
168-
case (r, Validated.Invalid(fs)) =>
169-
val badRow = createBadRow(r, fs)
170-
badSink.store(badRow.compact, None, false)
171-
case (_, Validated.Valid(_)) => ()
167+
val badRows = records.asScala.toList.collect { case (r, Validated.Invalid(fs)) =>
168+
createBadRow(r, fs).compact
172169
}
170+
badSink.store(badRows, false)
173171
}
174172
}

core/src/main/scala/com.snowplowanalytics.stream/loader/executors/StdinExecutor.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ class StdinExecutor(
4848
def run = for (ln <- scala.io.Source.stdin.getLines) {
4949
val (line, result) = transformer.consumeLine(ln)
5050
result.bimap(
51-
f => badSink.store(createBadRow(line, f).compact, None, false),
51+
f => badSink.store(List(createBadRow(line, f).compact), false),
5252
s =>
5353
goodSink match {
54-
case Left(gs) => gs.store(s.json.toString, None, true)
54+
case Left(gs) => gs.store(List(s.json.toString), true)
5555
case Right(sender) => sender.send(List(ln -> s.valid))
5656
}
5757
)

core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/ISink.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ package sinks
2323
* Shared interface for all sinks
2424
*/
2525
trait ISink {
26-
def store(output: String, key: Option[String], good: Boolean): Unit
26+
def store(outputs: List[String], good: Boolean): Unit
2727
}
2828

2929
/**
3030
* Sink which ignores all input
3131
*/
3232
class NullSink extends ISink {
33-
def store(output: String, key: Option[String], good: Boolean): Unit = ()
33+
def store(outputs: List[String], good: Boolean): Unit = ()
3434
}

core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KinesisSink.scala

+68-27
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory
2828

2929
// Scala
3030
import scala.util.{Failure, Random, Success, Try}
31+
import scala.collection.JavaConverters._
3132

3233
// Amazon
3334
import com.amazonaws.services.kinesis.model._
@@ -43,6 +44,7 @@ import com.snowplowanalytics.stream.loader.Config.Sink.BadSink.{Kinesis => Kines
4344
* @param conf Config for Kinesis sink
4445
*/
4546
class KinesisSink(conf: KinesisSinkConfig) extends ISink {
47+
import KinesisSink._
4648

4749
private lazy val log = LoggerFactory.getLogger(getClass)
4850

@@ -78,40 +80,44 @@ class KinesisSink(conf: KinesisSinkConfig) extends ISink {
7880
case rnfe: ResourceNotFoundException => false
7981
}
8082

81-
private def put(name: String, data: ByteBuffer, key: String): PutRecordResult = {
82-
val putRecordRequest = {
83-
val p = new PutRecordRequest()
84-
p.setStreamName(name)
85-
p.setData(data)
86-
p.setPartitionKey(key)
87-
p
83+
private def put(name: String, keyedData: List[KeyedData]): PutRecordsResult = {
84+
val prres = keyedData.map { case (key, data) =>
85+
new PutRecordsRequestEntry()
86+
.withPartitionKey(key)
87+
.withData(ByteBuffer.wrap(data))
8888
}
89-
client.putRecord(putRecordRequest)
89+
val putRecordsRequest =
90+
new PutRecordsRequest()
91+
.withStreamName(name)
92+
.withRecords(prres.asJava)
93+
client.putRecords(putRecordsRequest)
9094
}
9195

9296
/**
93-
* Write a record to the Kinesis stream
97+
* Write records to the Kinesis stream
9498
*
95-
* @param output The string record to write
96-
* @param key A hash of the key determines to which shard the
97-
* record is assigned. Defaults to a random string.
99+
* @param outputs The string records to write
98100
* @param good Unused parameter which exists to extend ISink
99101
*/
100-
def store(output: String, key: Option[String], good: Boolean): Unit =
101-
Try {
102-
put(
103-
conf.streamName,
104-
ByteBuffer.wrap(output.getBytes(UTF_8)),
105-
key.getOrElse(Random.nextInt.toString)
106-
)
107-
} match {
108-
case Success(result) =>
109-
log.info("Writing successful")
110-
log.info(s" + ShardId: ${result.getShardId}")
111-
log.info(s" + SequenceNumber: ${result.getSequenceNumber}")
112-
case Failure(f) =>
113-
log.error("Writing failed")
114-
log.error(" + " + f.getMessage)
102+
def store(outputs: List[String], good: Boolean): Unit =
103+
groupOutputs(conf.recordLimit, conf.byteLimit) {
104+
outputs.map(s => Random.nextInt.toString -> s.getBytes(UTF_8))
105+
}.foreach { keyedData =>
106+
Try {
107+
put(
108+
conf.streamName,
109+
keyedData
110+
)
111+
} match {
112+
case Success(result) =>
113+
log.info("Writing successful")
114+
result.getRecords.asScala.foreach { record =>
115+
log.debug(s" + ShardId: ${record.getShardId}")
116+
log.debug(s" + SequenceNumber: ${record.getSequenceNumber}")
117+
}
118+
case Failure(f) =>
119+
log.error("Writing to Kinesis failed", f)
120+
}
115121
}
116122

117123
implicit class AwsKinesisClientBuilderExtensions(builder: AmazonKinesisClientBuilder) {
@@ -127,3 +133,38 @@ class KinesisSink(conf: KinesisSinkConfig) extends ISink {
127133
if (cond) f(builder) else builder
128134
}
129135
}
136+
137+
object KinesisSink {
138+
139+
// Represents a partition key and the serialized record content
140+
private type KeyedData = (String, Array[Byte])
141+
142+
/**
143+
* Takes a list of records and splits it into several lists, where each list is as big as
144+
* possible with respecting the record limit and the size limit.
145+
*/
146+
def groupOutputs(recordLimit: Int, byteLimit: Int)(
147+
keyedData: List[KeyedData]
148+
): List[List[KeyedData]] = {
149+
case class Batch(size: Int, count: Int, keyedData: List[KeyedData])
150+
151+
keyedData
152+
.foldLeft(List.empty[Batch]) { case (acc, (key, data)) =>
153+
val recordSize = data.length + key.getBytes(UTF_8).length
154+
acc match {
155+
case head :: tail =>
156+
if (head.count + 1 > recordLimit || head.size + recordSize > byteLimit)
157+
List(Batch(recordSize, 1, List(key -> data))) ++ List(head) ++ tail
158+
else
159+
List(
160+
Batch(head.size + recordSize, head.count + 1, (key -> data) :: head.keyedData)
161+
) ++ tail
162+
case Nil =>
163+
List(Batch(recordSize, 1, List(key -> data)))
164+
}
165+
}
166+
.map(_.keyedData.reverse)
167+
.reverse
168+
}
169+
170+
}

core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/NsqSink.scala

+6-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import java.nio.charset.StandardCharsets.UTF_8
2525
// NSQ
2626
import com.snowplowanalytics.client.nsq.NSQProducer
2727

28+
// Scala
29+
import scala.collection.JavaConverters._
30+
2831
import com.snowplowanalytics.stream.loader.Config.Sink.BadSink.{Nsq => NsqSinkConfig}
2932

3033
/**
@@ -39,10 +42,9 @@ class NsqSink(conf: NsqSinkConfig) extends ISink {
3942
/**
4043
* Writes a string to NSQ
4144
*
42-
* @param output The string to write
43-
* @param key Unused parameter which exists to implement ISink
45+
* @param outputs The strings to write
4446
* @param good Unused parameter which exists to extend ISink
4547
*/
46-
override def store(output: String, key: Option[String], good: Boolean): Unit =
47-
producer.produce(conf.streamName, output.getBytes(UTF_8))
48+
override def store(outputs: List[String], good: Boolean): Unit =
49+
producer.produceMulti(conf.streamName, outputs.map(_.getBytes(UTF_8)).asJava)
4850
}

core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/StdouterrSink.scala

+6-8
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,12 @@ class StdouterrSink extends ISink {
2727
/**
2828
* Writes a string to stdout or stderr
2929
*
30-
* @param output The string to write
31-
* @param key Unused parameter which exists to implement ISink
30+
* @param outputs The strings to write
3231
* @param good Whether to write to stdout or stderr
3332
*/
34-
def store(output: String, key: Option[String], good: Boolean) =
35-
if (good) {
36-
println(output) // To stdout
37-
} else {
38-
Console.err.println(output) // To stderr
39-
}
33+
def store(outputs: List[String], good: Boolean): Unit =
34+
if (good)
35+
outputs.foreach(println(_)) // To stdout
36+
else
37+
outputs.foreach(Console.err.println(_)) // To stderr
4038
}

core/src/test/scala/com.snowplowanalytics.stream.loader/ConfigSpec.scala

+12-5
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,13 @@ class ConfigSpec extends Specification {
7777
Sink.GoodSink.Elasticsearch.ESChunk(999999, 499)
7878
),
7979
Sink.BadSink
80-
.Kinesis("test-kinesis-bad-stream", Region("eu-central-1"), "127.0.0.1:7846".some)
80+
.Kinesis(
81+
"test-kinesis-bad-stream",
82+
Region("eu-central-1"),
83+
"127.0.0.1:7846".some,
84+
500,
85+
5242880
86+
)
8187
),
8288
Purpose.Enriched,
8389
Monitoring(
@@ -123,7 +129,7 @@ class ConfigSpec extends Specification {
123129
Sink.GoodSink.Elasticsearch.ESCluster("good", None),
124130
Sink.GoodSink.Elasticsearch.ESChunk(1000000, 500)
125131
),
126-
Sink.BadSink.Kinesis("test-kinesis-bad-stream", DefaultTestRegion, None)
132+
Sink.BadSink.Kinesis("test-kinesis-bad-stream", DefaultTestRegion, None, 500, 5242880)
127133
),
128134
Purpose.Enriched,
129135
Monitoring(
@@ -295,7 +301,8 @@ class ConfigSpec extends Specification {
295301
Sink.GoodSink.Elasticsearch.ESCluster("testindex", None),
296302
Sink.GoodSink.Elasticsearch.ESChunk(206, 207)
297303
),
298-
Sink.BadSink.Kinesis("test-kinesis-bad-stream", Region("ca-central-1"), None)
304+
Sink.BadSink
305+
.Kinesis("test-kinesis-bad-stream", Region("ca-central-1"), None, 500, 5242880)
299306
),
300307
Purpose.Bad,
301308
Monitoring(
@@ -341,7 +348,7 @@ class ConfigSpec extends Specification {
341348
Sink.GoodSink.Elasticsearch.ESCluster("good", None),
342349
Sink.GoodSink.Elasticsearch.ESChunk(1000000, 500)
343350
),
344-
Sink.BadSink.Kinesis("test-kinesis-bad-stream", DefaultTestRegion, None)
351+
Sink.BadSink.Kinesis("test-kinesis-bad-stream", DefaultTestRegion, None, 500, 5242880)
345352
),
346353
Purpose.Enriched,
347354
Monitoring(
@@ -387,7 +394,7 @@ class ConfigSpec extends Specification {
387394
Sink.GoodSink.Elasticsearch.ESCluster("good", None),
388395
Sink.GoodSink.Elasticsearch.ESChunk(1000000, 500)
389396
),
390-
Sink.BadSink.Kinesis("test-kinesis-bad-stream", Region("eu-west-2"), None)
397+
Sink.BadSink.Kinesis("test-kinesis-bad-stream", Region("eu-west-2"), None, 500, 5242880)
391398
),
392399
Purpose.Enriched,
393400
Monitoring(

0 commit comments

Comments
 (0)