Skip to content

Commit

Permalink
Update bad rows sent by ES loader for enriched events (close #161)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jul 22, 2020
1 parent b5c3714 commit a86eec3
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 79 deletions.
6 changes: 2 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.kinesisConnector,
Dependencies.Libraries.nsqClient,
// Scala
Dependencies.Libraries.cats,
Dependencies.Libraries.catsEffect,
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.circeOptics,
Dependencies.Libraries.decline,
Expand All @@ -32,6 +30,7 @@ lazy val commonDependencies = Seq(
Dependencies.Libraries.awsSigner,
Dependencies.Libraries.pureconfig,
Dependencies.Libraries.pureconfigEnum,
Dependencies.Libraries.badRows,
// Scala (test only)
Dependencies.Libraries.specs2,
Dependencies.Libraries.circeLiteral
Expand Down Expand Up @@ -63,9 +62,8 @@ lazy val root = project

lazy val core = project
.settings(moduleName := "snowplow-elasticsearch-loader-core")
.settings(buildSettings)
.settings(allSettings)
.settings(BuildSettings.scalifySettings)
.settings(libraryDependencies ++= commonDependencies)

// project dealing with the ES
lazy val elasticsearch = project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,10 @@ class Emitter(
*/
override def fail(records: JList[EmitterJsonInput]): Unit = {
records.asScala.foreach {
case (r: String, Validated.Invalid(fs)) =>
val output = EsLoaderBadRow(r, fs).toCompactJson
badSink.store(output, None, false)
case (r, Validated.Invalid(fs)) =>
val badRow = createBadRow(r, fs)
badSink.store(badRow.compact, None, false)
case (_, Validated.Valid(_)) => ()
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import scala.concurrent.duration._
import scala.util.Random

// cats
import cats.effect.IO
import cats.effect.{ContextShift, IO}
import scala.concurrent.ExecutionContext
import cats.{Applicative, Id}
import cats.syntax.functor._

Expand Down Expand Up @@ -74,13 +75,15 @@ trait BulkSender[A] {
}

object BulkSender {
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

def delayPolicy[M[_]: Applicative](
maxAttempts: Int,
maxConnectionWaitTimeMs: Long): RetryPolicy[M] =
RetryPolicy.lift { status =>
if (status.retriesSoFar >= maxAttempts) PolicyDecision.GiveUp
else {
val maxDelay = 2.milliseconds * Math.pow(2, status.retriesSoFar).toLong
val maxDelay = 2.milliseconds * Math.pow(2, status.retriesSoFar.toDouble).toLong
val randomDelayNanos = (maxDelay.toNanos * Random.nextDouble()).toLong
val maxConnectionWaitTimeNano = maxConnectionWaitTimeMs * 1000
val delayNanos =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package com.snowplowanalytics.stream.loader.executors

import cats.syntax.validated._

import com.snowplowanalytics.stream.loader.{EmitterJsonInput, EsLoaderBadRow}
import com.snowplowanalytics.stream.loader.EmitterJsonInput
import com.snowplowanalytics.stream.loader.Config.{StreamLoaderConfig, StreamType}
import com.snowplowanalytics.stream.loader.clients.BulkSender
import com.snowplowanalytics.stream.loader.sinks.ISink
Expand All @@ -23,6 +23,7 @@ import com.snowplowanalytics.stream.loader.transformers.{
EnrichedEventJsonTransformer,
PlainJsonTransformer
}
import com.snowplowanalytics.stream.loader.createBadRow

class StdinExecutor(
config: StreamLoaderConfig,
Expand All @@ -43,7 +44,7 @@ class StdinExecutor(
def run = for (ln <- scala.io.Source.stdin.getLines) {
val (line, result) = transformer.consumeLine(ln)
result.bimap(
f => badSink.store(EsLoaderBadRow(line, f).toCompactJson, None, false),
f => badSink.store(createBadRow(line, f).compact, None, false),
s =>
goodSink match {
case Some(gs) => gs.store(s.json.toString, None, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import cats.data.ValidatedNel
import cats.syntax.validated._

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.instances._
import com.snowplowanalytics.iglu.core.circe.implicits._

/**
* Class to convert bad events to ElasticsearchObjects
Expand Down Expand Up @@ -73,6 +73,7 @@ object BadEventTransformer {
root.obj.modify(renameField("payload")),
root.payload.raw.obj.modify(serializeField("parameters")),
root.failure.obj.modify(renameField("error")),
root.failure.obj.modify(renameField("errors")),
root.failure.obj.modify(renameField("message")),
root.failure.messages.each.obj.modify(renameField("error")),
root.failure.messages.each.obj.modify(serializeField("expectedMapping")),
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/com.snowplowanalytics.stream/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
*/
package com.snowplowanalytics.stream

// cats
import java.time.Instant

import cats.data.ValidatedNel
import cats.data.NonEmptyList

import com.snowplowanalytics.snowplow.badrows._

package object loader {

Expand All @@ -34,4 +38,14 @@ package object loader {
* The input type for the ElasticsearchSender objects
*/
type EmitterJsonInput = (String, ValidatedNel[String, JsonRecord])

val processor = Processor(generated.Settings.name, generated.Settings.version)

/** Create a generic bad row. */
def createBadRow(line: String, errors: NonEmptyList[String]): BadRow.GenericError = {
val payload = Payload.RawPayload(line)
val timestamp = Instant.now()
val failure = Failure.GenericFailure(timestamp, errors)
BadRow.GenericError(processor, failure, payload)
}
}
1 change: 1 addition & 0 deletions project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ object BuildSettings {
assemblyJarName in assembly := { s"${moduleName.value}-${version.value}.jar" },
test in assembly := {},
assemblyMergeStrategy in assembly := {
case x if x.endsWith("module-info.class") => MergeStrategy.discard // not used by JDK8
case "META-INF/io.netty.versions.properties" => MergeStrategy.first
case PathList("org", "joda", "time", "base", "BaseDateTime.class") => MergeStrategy.first
case x =>
Expand Down
12 changes: 4 additions & 8 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@ object Dependencies {
val elasticsearch = "6.3.2"
val nsqClient = "1.1.0-rc1"
val jackson = "2.9.6"

// Scala
val cats = "1.6.1"
val catsEffect = "1.3.1"
val catsRetry = "0.2.5"
val circe = "0.11.2"
val circeOptics = "0.11.0"
val circe = "0.13.0"
val decline = "0.6.2"
val snowplowTracker = "0.6.1"
val analyticsSDK = "2.0.1"
val awsSigner = "0.5.0"
val elastic4s = "6.3.6"
val pureconfig = "0.9.1"
val badRows = "2.1.0"
// Scala (test only)
val specs2 = "4.1.0"
}
Expand All @@ -53,10 +50,8 @@ object Dependencies {
val elasticsearch = "org.elasticsearch" % "elasticsearch" % V.elasticsearch
val nsqClient = "com.snowplowanalytics" % "nsq-java-client_2.10" % V.nsqClient
// Scala
val cats = "org.typelevel" %% "cats-core" % V.cats
val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect
val catsRetry = "com.github.cb372" %% "cats-retry-cats-effect" % V.catsRetry
val circeOptics = "io.circe" %% "circe-optics" % V.circeOptics
val circeOptics = "io.circe" %% "circe-optics" % V.circe
val decline = "com.monovore" %% "decline" % V.decline
val snowplowTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.snowplowTracker
val snowplowTrackerId = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-id" % V.snowplowTracker
Expand All @@ -65,6 +60,7 @@ object Dependencies {
val pureconfig = "com.github.pureconfig" %% "pureconfig" % V.pureconfig
val pureconfigEnum = "com.github.pureconfig" %% "pureconfig-enumeratum" % V.pureconfig
val elastic4sHttp = "com.sksamuel.elastic4s" %% "elastic4s-http" % V.elastic4s
val badRows = "com.snowplowanalytics" %% "snowplow-badrows" % V.badRows
// Scala (test only)
val circeLiteral = "io.circe" %% "circe-literal" % V.circe % Test
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
Expand Down

0 comments on commit a86eec3

Please sign in to comment.