Skip to content

Commit

Permalink
Make the emitter generic (Closes snowplow#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
szareiangm committed Jul 6, 2020
1 parent 003f24b commit fae910c
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ case class EsLoaderBadRow(line: String, errors: NonEmptyList[String]) {
.withZone(DateTimeZone.UTC)

def toCompactJson =
Json.obj(
"line" -> line.asJson,
"errors" -> errors.asJson,
"failure_tstamp" -> getTstamp(tstamp, tstampFormat).asJson
).noSpaces
Json
.obj(
"line" -> line.asJson,
"errors" -> errors.asJson,
"failure_tstamp" -> getTstamp(tstamp, tstampFormat).asJson
)
.noSpaces
}

object EsLoaderBadRow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.amazonaws.services.kinesis.connectors.interfaces.{
}
import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration
import com.amazonaws.services.kinesis.connectors.impl.{AllPassFilter, BasicMemoryBuffer}
import emitters.Emitter

// This project
import com.snowplowanalytics.stream.loader.sinks._
Expand Down Expand Up @@ -60,7 +61,7 @@ class KinesisPipeline(
) extends IKinesisConnectorPipeline[ValidatedJsonRecord, EmitterJsonInput] {

def getEmitter(configuration: KinesisConnectorConfiguration): IEmitter[EmitterJsonInput] =
new Emitter(bulkSender, goodSink, badSink, bufferRecordLimit, bufferByteLimit)
new Emitter(bulkSender, badSink, bufferRecordLimit, bufferByteLimit)

def getBuffer(configuration: KinesisConnectorConfiguration): IBuffer[ValidatedJsonRecord] =
new BasicMemoryBuffer[ValidatedJsonRecord](configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,91 +17,94 @@
* governing permissions and limitations there under.
*/
package com.snowplowanalytics.stream.loader
package emitters

// Amazon
import cats.data.NonEmptyList
import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer
import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
import com.snowplowanalytics.stream.loader.clients.BulkSender
import com.snowplowanalytics.stream.loader.sinks.ISink
import com.snowplowanalytics.stream.loader.EsLoaderBadRow

import scala.collection.mutable.ListBuffer

// Java
import java.io.IOException
import java.util.{List => JList}
import java.util.{List => List}

// cats
import cats.data.Validated

// Scala
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._
import scala.collection.immutable.{List => SList}

// This project
import sinks.ISink
import clients.BulkSender

/**
* Emitter class for any sort of BulkSender Extension
*
* @param bulkSender The bulkSender Client to use for the sink
* @param goodSink the configured GoodSink
* @param badSink the configured BadSink
* @param bufferRecordLimit record limit for buffer
* @param bufferByteLimit byte limit for buffer
*/
class Emitter(
bulkSender: BulkSender[EmitterJsonInput],
goodSink: Option[ISink],
class Emitter[T](
bulkSender: BulkSender[T],
badSink: ISink,
bufferRecordLimit: Long,
bufferByteLimit: Long
) extends IEmitter[EmitterJsonInput] {

@throws[IOException]
override def emit(buffer: UnmodifiableBuffer[EmitterJsonInput]): JList[EmitterJsonInput] =
attemptEmit(buffer.getRecords.asScala.toList).asJava
) extends IEmitter[T] {

/**
* Emits good records to stdout or sink.
* All records which sink rejects and all records which failed transformation
* This function is called from AWS library.
* Emits good records to Kinesis, Postgres, S3 or Elasticsearch.
* All records which Elasticsearch rejects and all records which failed transformation
* get sent to to stderr or Kinesis.
*
* @param records list containing EmitterJsonInputs
* @return list of inputs which failed transformation or which the sink rejected
* @param buffer list containing EmitterInputs
* @return list of inputs which failed transformation or which Elasticsearch rejected
*/
@throws[IOException]
private def attemptEmit(records: List[EmitterJsonInput]): List[EmitterJsonInput] = {
if (records.isEmpty) {
Nil
def emit(buffer: UnmodifiableBuffer[T]): List[T] =
if (buffer.getRecords.asScala.isEmpty) {
null
} else {
val (validRecords: List[EmitterJsonInput], invalidRecords: List[EmitterJsonInput]) =
records.partition(_._2.isValid)
// Send all valid records to stdout / Sink and return those rejected by it
val rejects = goodSink match {
case Some(s) =>
validRecords.foreach {
case (_, Validated.Valid(r)) => s.store(r.json.toString, None, true)
case _ => ()
}
Nil
case None if validRecords.isEmpty => Nil
case _ => emit(validRecords)
}
invalidRecords ++ rejects
}
}
// Send all valid records to bulk sender and returned rejected/unvalidated ones.
sliceAndSend(buffer.getRecords.asScala.toList)
}.asJava

/**
* This is called from NsqSourceExecutor
* Emits good records to Sink and bad records to Kinesis.
* All valid records in the buffer get sent to the sink in a bulk request.
* All invalid requests and all requests which failed transformation get sent to Kinesis.
*
* @param records List of records to send
* @return List of inputs which the sink rejected
*/
def emit(records: List[EmitterJsonInput]): List[EmitterJsonInput] =
def emitList(records: SList[T]): SList[T] =
for {
recordSlice <- splitBuffer(records, bufferByteLimit, bufferRecordLimit)
result <- bulkSender.send(recordSlice)
} yield result

/**
* Emits good records to Elasticsearch and bad records to Kinesis.
* All valid records in the buffer get sent to Elasticsearch in a bulk request.
* All invalid requests and all requests which failed transformation get sent to Kinesis.
*
* @param records List of records to send to Elasticsearch
* @return List of inputs which Elasticsearch rejected
*/
def sliceAndSend(records: SList[T]): SList[T] = {
val failures: SList[SList[T]] = for {
recordSlice <- splitBuffer(records, bufferByteLimit, bufferRecordLimit)
} yield bulkSender.send(recordSlice)
failures.flatten
}

/**
* Splits the buffer into emittable chunks based on the
* buffer settings defined in the config
Expand All @@ -111,16 +114,16 @@ class Emitter(
* @param recordLimit emitter record limit
* @return a list of buffers
*/
private def splitBuffer(
records: List[EmitterJsonInput],
def splitBuffer(
records: SList[T],
byteLimit: Long,
recordLimit: Long
): List[List[EmitterJsonInput]] = {
): SList[SList[T]] = {
// partition the records in
val remaining: ListBuffer[EmitterJsonInput] = records.to[ListBuffer]
val buffers: ListBuffer[List[EmitterJsonInput]] = new ListBuffer
val curBuffer: ListBuffer[EmitterJsonInput] = new ListBuffer
var runningByteCount: Long = 0L
val remaining: ListBuffer[T] = records.to[ListBuffer]
val buffers: ListBuffer[SList[T]] = new ListBuffer
val curBuffer: ListBuffer[T] = new ListBuffer
var runningByteCount: Long = 0L

while (remaining.nonEmpty) {
val record = remaining.remove(0)
Expand Down Expand Up @@ -164,9 +167,9 @@ class Emitter(
*
* @param records List of failed records
*/
override def fail(records: JList[EmitterJsonInput]): Unit = {
override def fail(records: List[T]): Unit = {
records.asScala.foreach {
case (r: String, Validated.Invalid(fs)) =>
case (r: String, Validated.Invalid(fs: NonEmptyList[String])) =>
val output = EsLoaderBadRow(r, fs).toCompactJson
badSink.store(output, None, false)
case (_, Validated.Valid(_)) => ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.snowplowanalytics.client.nsq.NSQConfig
import com.snowplowanalytics.client.nsq.callbacks.NSQMessageCallback
import com.snowplowanalytics.client.nsq.callbacks.NSQErrorCallback
import com.snowplowanalytics.client.nsq.exceptions.NSQException
import emitters.Emitter

//Java
import java.nio.charset.StandardCharsets.UTF_8
Expand Down Expand Up @@ -71,9 +72,8 @@ class NsqSourceExecutor(
private val msgBuffer = new ListBuffer[EmitterJsonInput]()
// ElasticsearchEmitter instance
private val emitter =
new Emitter(
new Emitter[EmitterJsonInput](
bulkSender,
goodSink,
badSink,
config.streams.buffer.recordLimit,
config.streams.buffer.byteLimit)
Expand All @@ -98,7 +98,7 @@ class NsqSourceExecutor(
msg.finished()

if (msgBuffer.size == nsqBufferSize) {
val rejectedRecords = emitter.emit(msgBuffer.toList)
val rejectedRecords = emitter.emitList(msgBuffer.toList)
emitter.fail(rejectedRecords.asJava)
msgBuffer.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ package com.snowplowanalytics.stream.loader

// Java
import java.util.Properties

import emitters.Emitter
import org.slf4j.Logger

// Scala
Expand Down Expand Up @@ -75,7 +77,7 @@ class EmitterSpec extends Specification {

val kcc =
new KinesisConnectorConfiguration(new Properties, new DefaultAWSCredentialsProviderChain)
val eem = new Emitter(fakeSender, None, new StdouterrSink, 1, 1L)
val eem = new Emitter(fakeSender, new StdouterrSink, 1, 1L)

val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid
val invalidInput: EmitterJsonInput = "bad" -> "malformed event".invalidNel
Expand All @@ -96,7 +98,7 @@ class EmitterSpec extends Specification {

val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain)
val ess = new MockElasticsearchSender
val eem = new Emitter(ess, None, new StdouterrSink, 1, 1000L)
val eem = new Emitter(ess, new StdouterrSink, 1, 1000L)

val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid

Expand All @@ -120,7 +122,7 @@ class EmitterSpec extends Specification {

val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain)
val ess = new MockElasticsearchSender
val eem = new Emitter(ess, None, new StdouterrSink, 1, 1000L)
val eem = new Emitter(ess, new StdouterrSink, 1, 1000L)

val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid

Expand All @@ -144,7 +146,7 @@ class EmitterSpec extends Specification {

val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain)
val ess = new MockElasticsearchSender
val eem = new Emitter(ess, None, new StdouterrSink, 100, 1048576L)
val eem = new Emitter(ess, new StdouterrSink, 100, 1048576L)

val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid

Expand All @@ -168,7 +170,7 @@ class EmitterSpec extends Specification {

val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain)
val ess = new MockElasticsearchSender
val eem = new Emitter(ess, None, new StdouterrSink, 1, 1048576L)
val eem = new Emitter(ess, new StdouterrSink, 1, 1048576L)

val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid

Expand All @@ -192,7 +194,7 @@ class EmitterSpec extends Specification {

val kcc = new KinesisConnectorConfiguration(props, new DefaultAWSCredentialsProviderChain)
val ess = new MockElasticsearchSender
val eem = new Emitter(ess, None, new StdouterrSink, 2, 200L)
val eem = new Emitter(ess, new StdouterrSink, 2, 200L)

// record size is 95 bytes
val validInput: EmitterJsonInput = "good" -> JsonRecord(Json.obj(), None).valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clients
// AWS
import com.amazonaws.services.kinesis.connectors.elasticsearch.ElasticsearchObject
import com.amazonaws.auth.AWSCredentialsProvider
import emitters.Emitter

// Java
import com.google.common.base.Charsets
Expand Down

0 comments on commit fae910c

Please sign in to comment.