diff --git a/build.sbt b/build.sbt index dfcb20fe..c6bc8aae 100644 --- a/build.sbt +++ b/build.sbt @@ -21,8 +21,6 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.kinesisConnector, Dependencies.Libraries.nsqClient, // Scala - Dependencies.Libraries.cats, - Dependencies.Libraries.catsEffect, Dependencies.Libraries.catsRetry, Dependencies.Libraries.circeOptics, Dependencies.Libraries.decline, @@ -32,6 +30,7 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.awsSigner, Dependencies.Libraries.pureconfig, Dependencies.Libraries.pureconfigEnum, + Dependencies.Libraries.badRows, // Scala (test only) Dependencies.Libraries.specs2, Dependencies.Libraries.circeLiteral @@ -63,9 +62,8 @@ lazy val root = project lazy val core = project .settings(moduleName := "snowplow-elasticsearch-loader-core") - .settings(buildSettings) + .settings(allSettings) .settings(BuildSettings.scalifySettings) - .settings(libraryDependencies ++= commonDependencies) // project dealing with the ES lazy val elasticsearch = project diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/Emitter.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/Emitter.scala index 01b9268f..fa7988c8 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/Emitter.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/Emitter.scala @@ -166,11 +166,10 @@ class Emitter( */ override def fail(records: JList[EmitterJsonInput]): Unit = { records.asScala.foreach { - case (r: String, Validated.Invalid(fs)) => - val output = EsLoaderBadRow(r, fs).toCompactJson - badSink.store(output, None, false) + case (r, Validated.Invalid(fs)) => + val badRow = createBadRow(r, fs) + badSink.store(badRow.compact, None, false) case (_, Validated.Valid(_)) => () } } - } diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/EsLoaderBadRow.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/EsLoaderBadRow.scala deleted file mode 100644 index d3bf26eb..00000000 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/EsLoaderBadRow.scala +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Copyright (c) 2014-2020 Snowplow Analytics Ltd. - * All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache - * License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. - * - * See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ -package com.snowplowanalytics.stream.loader - -// cats -import cats.data.NonEmptyList - -// json4s -import io.circe.Json -import io.circe.syntax._ - -// Joda-Time -import org.joda.time.{DateTime, DateTimeZone} -import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} - -/** ES Loader rad row that could not be transformed by StdinTransformer */ -case class EsLoaderBadRow(line: String, errors: NonEmptyList[String]) { - import EsLoaderBadRow._ - - private val tstamp = System.currentTimeMillis() - // An ISO valid timestamp formatter - private val tstampFormat = DateTimeFormat - .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - .withZone(DateTimeZone.UTC) - - def toCompactJson = - Json - .obj( - "line" -> line.asJson, - "errors" -> errors.asJson, - "failure_tstamp" -> getTstamp(tstamp, tstampFormat).asJson - ) - .noSpaces -} - -object EsLoaderBadRow { - private def getTstamp(tstamp: Long, format: DateTimeFormatter): String = { - val dt = new DateTime(tstamp) - format.print(dt) - } -} diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/clients/BulkSender.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/clients/BulkSender.scala index 473593eb..007ede8c 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/clients/BulkSender.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/clients/BulkSender.scala @@ -30,7 +30,8 @@ import scala.concurrent.duration._ import scala.util.Random // cats -import cats.effect.IO +import cats.effect.{ContextShift, IO} +import scala.concurrent.ExecutionContext import cats.{Applicative, Id} import cats.syntax.functor._ @@ -74,13 +75,15 @@ trait BulkSender[A] { } object BulkSender { + implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + def delayPolicy[M[_]: Applicative]( maxAttempts: Int, maxConnectionWaitTimeMs: Long): RetryPolicy[M] = RetryPolicy.lift { status => if (status.retriesSoFar >= maxAttempts) PolicyDecision.GiveUp else { - val maxDelay = 2.milliseconds * Math.pow(2, status.retriesSoFar).toLong + val maxDelay = 2.milliseconds * Math.pow(2, status.retriesSoFar.toDouble).toLong val randomDelayNanos = (maxDelay.toNanos * Random.nextDouble()).toLong val maxConnectionWaitTimeNano = maxConnectionWaitTimeMs * 1000 val delayNanos = diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/StdinExecutor.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/StdinExecutor.scala index 254c2aa8..a353f11c 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/StdinExecutor.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/StdinExecutor.scala @@ -14,7 +14,7 @@ package com.snowplowanalytics.stream.loader.executors import cats.syntax.validated._ -import com.snowplowanalytics.stream.loader.{EmitterJsonInput, EsLoaderBadRow} +import com.snowplowanalytics.stream.loader.EmitterJsonInput import com.snowplowanalytics.stream.loader.Config.{StreamLoaderConfig, StreamType} import com.snowplowanalytics.stream.loader.clients.BulkSender import com.snowplowanalytics.stream.loader.sinks.ISink @@ -23,6 +23,7 @@ import com.snowplowanalytics.stream.loader.transformers.{ EnrichedEventJsonTransformer, PlainJsonTransformer } +import com.snowplowanalytics.stream.loader.createBadRow class StdinExecutor( config: StreamLoaderConfig, @@ -43,7 +44,7 @@ class StdinExecutor( def run = for (ln <- scala.io.Source.stdin.getLines) { val (line, result) = transformer.consumeLine(ln) result.bimap( - f => badSink.store(EsLoaderBadRow(line, f).toCompactJson, None, false), + f => badSink.store(createBadRow(line, f).compact, None, false), s => goodSink match { case Some(gs) => gs.store(s.json.toString, None, true) diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/transformers/BadEventTransformer.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/transformers/BadEventTransformer.scala index 74c7c43f..ab23fe01 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/transformers/BadEventTransformer.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/transformers/BadEventTransformer.scala @@ -34,7 +34,7 @@ import cats.data.ValidatedNel import cats.syntax.validated._ import com.snowplowanalytics.iglu.core.SelfDescribingData -import com.snowplowanalytics.iglu.core.circe.instances._ +import com.snowplowanalytics.iglu.core.circe.implicits._ /** * Class to convert bad events to ElasticsearchObjects @@ -73,6 +73,7 @@ object BadEventTransformer { root.obj.modify(renameField("payload")), root.payload.raw.obj.modify(serializeField("parameters")), root.failure.obj.modify(renameField("error")), + root.failure.obj.modify(renameField("errors")), root.failure.obj.modify(renameField("message")), root.failure.messages.each.obj.modify(renameField("error")), root.failure.messages.each.obj.modify(serializeField("expectedMapping")), diff --git a/core/src/main/scala/com.snowplowanalytics.stream/package.scala b/core/src/main/scala/com.snowplowanalytics.stream/package.scala index 8061916c..2a4775d0 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/package.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/package.scala @@ -18,8 +18,12 @@ */ package com.snowplowanalytics.stream -// cats +import java.time.Instant + import cats.data.ValidatedNel +import cats.data.NonEmptyList + +import com.snowplowanalytics.snowplow.badrows._ package object loader { @@ -34,4 +38,14 @@ package object loader { * The input type for the ElasticsearchSender objects */ type EmitterJsonInput = (String, ValidatedNel[String, JsonRecord]) + + val processor = Processor(generated.Settings.name, generated.Settings.version) + + /** Create a generic bad row. */ + def createBadRow(line: String, errors: NonEmptyList[String]): BadRow.GenericError = { + val payload = Payload.RawPayload(line) + val timestamp = Instant.now() + val failure = Failure.GenericFailure(timestamp, errors) + BadRow.GenericError(processor, failure, payload) + } } diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index d9a0208e..36b7f75b 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -76,6 +76,7 @@ object BuildSettings { assemblyJarName in assembly := { s"${moduleName.value}-${version.value}.jar" }, test in assembly := {}, assemblyMergeStrategy in assembly := { + case x if x.endsWith("module-info.class") => MergeStrategy.discard // not used by JDK8 case "META-INF/io.netty.versions.properties" => MergeStrategy.first case PathList("org", "joda", "time", "base", "BaseDateTime.class") => MergeStrategy.first case x => diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e6bb6d5f..fe7a4da3 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -24,19 +24,16 @@ object Dependencies { val elasticsearch = "6.3.2" val nsqClient = "1.1.0-rc1" val jackson = "2.9.6" - // Scala - val cats = "1.6.1" - val catsEffect = "1.3.1" val catsRetry = "0.2.5" - val circe = "0.11.2" - val circeOptics = "0.11.0" + val circe = "0.13.0" val decline = "0.6.2" val snowplowTracker = "0.6.1" val analyticsSDK = "2.0.1" val awsSigner = "0.5.0" val elastic4s = "6.3.6" val pureconfig = "0.9.1" + val badRows = "2.1.0" // Scala (test only) val specs2 = "4.1.0" } @@ -53,10 +50,8 @@ object Dependencies { val elasticsearch = "org.elasticsearch" % "elasticsearch" % V.elasticsearch val nsqClient = "com.snowplowanalytics" % "nsq-java-client_2.10" % V.nsqClient // Scala - val cats = "org.typelevel" %% "cats-core" % V.cats - val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect val catsRetry = "com.github.cb372" %% "cats-retry-cats-effect" % V.catsRetry - val circeOptics = "io.circe" %% "circe-optics" % V.circeOptics + val circeOptics = "io.circe" %% "circe-optics" % V.circe val decline = "com.monovore" %% "decline" % V.decline val snowplowTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.snowplowTracker val snowplowTrackerId = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-id" % V.snowplowTracker @@ -65,6 +60,7 @@ object Dependencies { val pureconfig = "com.github.pureconfig" %% "pureconfig" % V.pureconfig val pureconfigEnum = "com.github.pureconfig" %% "pureconfig-enumeratum" % V.pureconfig val elastic4sHttp = "com.sksamuel.elastic4s" %% "elastic4s-http" % V.elastic4s + val badRows = "com.snowplowanalytics" %% "snowplow-badrows" % V.badRows // Scala (test only) val circeLiteral = "io.circe" %% "circe-literal" % V.circe % Test val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test