Skip to content

Commit 5e88175

Browse files
istreeteroguzhanunlu
authored andcommitted
Stream transformer: Use Http4s client for iglu lookups (close #1258)
1 parent c9cd80f commit 5e88175

File tree

12 files changed

+41
-18
lines changed

12 files changed

+41
-18
lines changed

modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Processing.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import fs2.compression.{Compression => FS2Compression}
2929
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
3030

3131
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload, Processor}
32+
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
3233

3334
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
3435
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
@@ -69,6 +70,7 @@ object Processing {
6970
config: Config,
7071
processor: Processor
7172
): Stream[F, Unit] = {
73+
implicit val lookup: RegistryLookup[F] = resources.registryLookup
7274
val transformer: Transformer[F] = config.formats match {
7375
case f: TransformerConfig.Formats.Shred =>
7476
Transformer.ShredTransformer(resources.igluResolver, resources.propertiesCache, f, processor)
@@ -117,7 +119,7 @@ object Processing {
117119
}
118120

119121
/** Build a sink according to settings and pass it through `generic.Partitioned` */
120-
def getSink[F[_]: Async, C: Monoid](
122+
def getSink[F[_]: Async: RegistryLookup, C: Monoid](
121123
resources: Resources[F, C],
122124
config: Config.Output,
123125
formats: Formats

modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Resources.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.typelevel.log4cats.Logger
2121

2222
import io.circe.Json
2323

24-
import cats.Applicative
24+
import cats.{Applicative, Monad, MonadThrow}
2525
import cats.implicits._
2626
import cats.effect._
2727
import cats.effect.std.Random
@@ -59,7 +59,8 @@ case class Resources[F[_], C](
5959
inputStream: Queue.Consumer[F],
6060
checkpointer: Queue.Consumer.Message[F] => C,
6161
blobStorage: BlobStorage[F],
62-
badSink: BadSink[F]
62+
badSink: BadSink[F],
63+
registryLookup: RegistryLookup[F]
6364
)
6465

6566
object Resources {
@@ -114,7 +115,8 @@ object Resources {
114115
inputStream,
115116
checkpointer,
116117
blobStorage,
117-
badSink
118+
badSink,
119+
registryLookup
118120
)
119121

120122
private def mkBadSink[F[_]: Applicative](
@@ -145,17 +147,21 @@ object Resources {
145147
case Left(error) => Sync[F].raiseError[Resolver[F]](error)
146148
}
147149
}
148-
private def mkEventParser[F[_]: Sync: Clock](igluResolver: Resolver[F], config: Config): Resource[F, EventParser] = Resource.eval {
149-
mkAtomicLengths(igluResolver, config).flatMap {
150-
case Right(atomicLengths) => Sync[F].pure(Event.parser(atomicLengths))
151-
case Left(error) => Sync[F].raiseError[EventParser](error)
150+
private def mkEventParser[F[_]: MonadThrow: Clock: RegistryLookup](igluResolver: Resolver[F], config: Config): Resource[F, EventParser] =
151+
Resource.eval {
152+
mkAtomicLengths(igluResolver, config).flatMap {
153+
case Right(atomicLengths) => Monad[F].pure(Event.parser(atomicLengths))
154+
case Left(error) => MonadThrow[F].raiseError[EventParser](error)
155+
}
152156
}
153-
}
154-
private def mkAtomicLengths[F[_]: Sync: Clock](igluResolver: Resolver[F], config: Config): F[Either[RuntimeException, Map[String, Int]]] =
157+
private def mkAtomicLengths[F[_]: Monad: Clock: RegistryLookup](
158+
igluResolver: Resolver[F],
159+
config: Config
160+
): F[Either[RuntimeException, Map[String, Int]]] =
155161
if (config.featureFlags.truncateAtomicFields) {
156162
EventUtils.getAtomicLengths(igluResolver)
157163
} else {
158-
Sync[F].pure(Right(Map.empty[String, Int]))
164+
Monad[F].pure(Right(Map.empty[String, Int]))
159165
}
160166

161167
private def mkTransformerInstanceId[F[_]: Sync] =

modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/parquet/ParquetSink.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.parquet
1616

1717
import cats.data.EitherT
18-
import cats.effect.Async
18+
import cats.Monad
19+
import cats.effect.{Async, Clock}
1920
import cats.implicits._
2021
import com.github.mjakubowski84.parquet4s.{ParquetWriter, Path, RowParquetRecord}
2122
import com.github.mjakubowski84.parquet4s.parquet.viaParquet
23+
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
2224
import com.snowplowanalytics.snowplow.analytics.scalasdk.Data
2325
import com.snowplowanalytics.snowplow.badrows.FailureDetails
2426
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow
@@ -39,7 +41,7 @@ import java.net.URI
3941

4042
object ParquetSink {
4143

42-
def parquetSink[F[_]: Async, C](
44+
def parquetSink[F[_]: Async: RegistryLookup, C](
4345
resources: Resources[F, C],
4446
compression: Compression,
4547
maxRecordsPerFile: Long,
@@ -66,7 +68,7 @@ object ParquetSink {
6668
}
6769
}
6870

69-
private def createSchemaFromTypes[F[_]: Async, C](
71+
private def createSchemaFromTypes[F[_]: Monad: Clock: RegistryLookup, C](
7072
resources: Resources[F, C],
7173
types: List[Data.ShreddedType]
7274
): EitherT[F, FailureDetails.LoaderIgluError, MessageType] =

modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/sinks/TransformingSpec.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
*/
1515
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks
1616

17-
import cats.effect.IO
17+
import cats.effect.{IO, Resource}
1818
import cats.effect.unsafe.implicits.global
1919

2020
import java.time.Instant
@@ -24,8 +24,9 @@ import fs2.{Stream, text}
2424
import io.circe.Json
2525
import io.circe.optics.JsonPath._
2626
import io.circe.parser.{parse => parseCirce}
27+
import org.http4s.client.{Client => Http4sClient}
2728
import com.snowplowanalytics.iglu.client.Resolver
28-
import com.snowplowanalytics.iglu.client.resolver.registries.Registry
29+
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, Registry, RegistryLookup}
2930
import com.snowplowanalytics.iglu.schemaddl.Properties
3031
import com.snowplowanalytics.lrumap.CreateLruMap
3132
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
@@ -118,6 +119,12 @@ object TransformingSpec {
118119
def data: Transformed.Data = value._2
119120
}
120121

122+
implicit val registryLookup: RegistryLookup[IO] = Http4sRegistryLookup {
123+
Http4sClient[IO] { _ =>
124+
Resource.eval(IO.raiseError(new RuntimeException("Unexpected registry lookup")))
125+
}
126+
}
127+
121128
val VersionPlaceholder = "version_placeholder"
122129
val BadPathPrefix = "output=bad"
123130
val DefaultTimestamp = "2020-09-29T10:38:56.653Z"

modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ package object common {
4848

4949
def isInputError(clientError: ClientError): Boolean =
5050
clientError match {
51-
case ClientError.ValidationError(_) =>
51+
case ClientError.ValidationError(_, _) =>
5252
false
5353
case ClientError.ResolutionError(map) =>
5454
map.values.toList.flatMap(_.errors.toList).exists {

modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/ParquetFieldsProviderSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.snowplowanalytics.snowplow.rdbloader.common
22

33
import cats.Id
44
import com.snowplowanalytics.iglu.client.Client
5+
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
56
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
67
import com.snowplowanalytics.iglu.schemaddl.parquet.Type.Nullability.{Nullable, Required}
78
import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type}

modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/MonitoringSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import com.snowplowanalytics.iglu.core.SelfDescribingData
2727
import com.snowplowanalytics.iglu.core.circe.implicits._
2828

2929
import com.snowplowanalytics.iglu.client.Client
30+
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
3031
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
3132
import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring.AlertPayload
3233
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo

modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJob.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.batch
1717
import cats.Id
1818
import cats.implicits._
1919
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
20+
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
2021
import com.snowplowanalytics.iglu.client.{Client, Resolver}
2122
import com.snowplowanalytics.iglu.client.validator.CirceValidator
2223
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage

modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.batch
1717
import cats.Id
1818
import cats.implicits._
1919
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
20+
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
2021
import com.snowplowanalytics.iglu.schemaddl.parquet.FieldValue
2122
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.ParquetTransformer
2223
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields

modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/singleton.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import cats.syntax.show._
2121

2222
import com.snowplowanalytics.iglu.client.Resolver
2323
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
24+
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
2425
import com.snowplowanalytics.iglu.schemaddl.Properties
2526

2627
import com.snowplowanalytics.lrumap.CreateLruMap

modules/transformer-batch/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/ShredJobSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.batch
1616

1717
import cats.Id
1818
import com.snowplowanalytics.iglu.client.Resolver
19+
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
1920
import com.snowplowanalytics.snowplow.rdbloader.common.catsClockIdInstance
2021
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{AtomicFieldsProvider, NonAtomicFieldsProvider}
2122
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields

project/Dependencies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ object Dependencies {
1717
object V {
1818
// Scala (Loader)
1919
val decline = "2.4.1"
20-
val igluClient = "3.0.0-M1"
20+
val igluClient = "3.0.0"
2121
val igluCore = "1.1.1"
2222
val badrows = "2.2.0"
2323
val analyticsSdk = "3.1.0"

0 commit comments

Comments
 (0)