Skip to content

Commit

Permalink
Bump iglu-scala-client to 4.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 27, 2024
1 parent 875ebbd commit 17f297a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
package com.snowplowanalytics.snowplow.rdbloader.common

import cats.{Monad, Order}
import cats.data.EitherT
import cats.data.{EitherT, NonEmptyList}
import cats.effect.Clock
import cats.syntax.all._
import io.circe.Json
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey}
import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingSchema}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits.toSchema
import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError
Expand All @@ -40,32 +40,25 @@ object SchemaProvider {
schema <- EitherT.fromOption[F](Schema.parse(json), parseSchemaBadRow(schemaKey))
} yield schema

def parseSchemaJsons[F[_]](jsons: NonEmptyList[SelfDescribingSchema[Json]]): Either[LoaderIgluError, NonEmptyList[SchemaWithKey]] =
jsons.traverse { json =>
Schema
.parse(json.schema)
.toRight(parseSchemaBadRow(json.self.schemaKey))
.map(schema => SchemaWithKey(json.self.schemaKey, schema))
}

def fetchSchemasWithSameModel[F[_]: Clock: Monad: RegistryLookup](
resolver: Resolver[F],
schemaKey: SchemaKey
): EitherT[F, LoaderIgluError, List[SchemaWithKey]] =
EitherT(resolver.listSchemasLike(schemaKey))
.leftMap(resolverFetchBadRow(schemaKey.vendor, schemaKey.name, schemaKey.format, schemaKey.version.model))
.map(_.schemas)
.flatMap(schemaKeys =>
schemaKeys
.traverse(schemaKey =>
getSchema(resolver, schemaKey)
.map(schema => SchemaWithKey(schemaKey, schema))
)
)

private def resolverFetchBadRow(
vendor: String,
name: String,
format: String,
model: Int
)(
e: ClientError.ResolutionError
): LoaderIgluError =
LoaderIgluError.SchemaListNotFound(SchemaCriterion(vendor = vendor, name = name, format = format, model = model), e)
for {
jsons <- EitherT(resolver.lookupSchemasUntil(schemaKey))
.leftMap(e => resolverBadRow(e.schemaKey)(e.error))
schemas <- EitherT.fromEither[F](parseSchemaJsons(jsons))
} yield schemas.toList

private def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): LoaderIgluError =
def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): LoaderIgluError =
LoaderIgluError.IgluError(schemaKey, e)

private def parseSchemaBadRow(schemaKey: SchemaKey): LoaderIgluError =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@ import cats.implicits._
import cats.data.{EitherT, NonEmptyList}
import cats.effect.Clock
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.SchemaContentList
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.resolver.Resolver.{ResolverResult, SchemaListKey}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaList, SchemaMap, SelfDescribingSchema}
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverResult
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SelfDescribingSchema}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, FieldValue}
import com.snowplowanalytics.iglu.schemaddl.redshift._
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, FailureDetails, Processor}
import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError
import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError.SchemaListNotFound
import com.snowplowanalytics.snowplow.rdbloader.common.Common.AtomicSchema
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.Shredded.ShreddedFormat
import com.snowplowanalytics.snowplow.rdbloader.common.SchemaProvider
import com.snowplowanalytics.snowplow.rdbloader.common.SchemaProvider.SchemaWithKey

/** Represents transformed data in blob storage */

Expand Down Expand Up @@ -151,25 +150,13 @@ object Transformed {

def getShredModel[F[_]: Monad: Clock: RegistryLookup](
schemaKey: SchemaKey,
schemaKeys: List[SchemaKey],
resolver: Resolver[F]
schemaContentList: SchemaContentList
): EitherT[F, LoaderIgluError, ShredModel] =
schemaKeys
.traverse { sk =>
SchemaProvider
.getSchema(resolver, sk)
.map(schema => SchemaWithKey(sk, schema))
}
.flatMap { schemaWithKeyList =>
EitherT
.fromOption[F][FailureDetails.LoaderIgluError, NonEmptyList[SchemaWithKey]](
NonEmptyList.fromList(schemaWithKeyList),
FailureDetails.LoaderIgluError.InvalidSchema(schemaKey, s"Empty resolver response for $schemaKey")
)
.map { nel =>
val schemas = nel.map(swk => SelfDescribingSchema[Schema](SchemaMap(swk.schemaKey), swk.schema))
foldMapRedshiftSchemas(schemas)(schemaKey)
}
EitherT
.fromEither[F](SchemaProvider.parseSchemaJsons(schemaContentList))
.map { nel =>
val schemas = nel.map(swk => SelfDescribingSchema[Schema](SchemaMap(swk.schemaKey), swk.schema))
foldMapRedshiftSchemas(schemas)(schemaKey)
}

/**
Expand All @@ -179,32 +166,27 @@ object Transformed {
schemaKey: SchemaKey,
shredModelCache: ShredModelCache[F],
resolver: => Resolver[F]
): EitherT[F, LoaderIgluError, ShredModel] = {
val criterion = SchemaCriterion(schemaKey.vendor, schemaKey.name, schemaKey.format, Some(schemaKey.version.model), None, None)

EitherT(resolver.listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model, Some(schemaKey)))
.leftMap(error => SchemaListNotFound(criterion, error))
): EitherT[F, LoaderIgluError, ShredModel] =
EitherT(resolver.lookupSchemasUntilResult(schemaKey))
.leftMap(e => SchemaProvider.resolverBadRow(e.schemaKey)(e.error))
.flatMap {
case cached: ResolverResult.Cached[SchemaListKey, SchemaList] =>
lookupInCache(schemaKey, resolver, shredModelCache, cached)
case ResolverResult.NotCached(schemaList) =>
val schemaKeys = schemaList.schemas
getShredModel(schemaKey, schemaKeys, resolver)
case cached: ResolverResult.Cached[SchemaKey, SchemaContentList] =>
lookupInCache(schemaKey, shredModelCache, cached)
case ResolverResult.NotCached(schemaContentList) =>
getShredModel(schemaKey, schemaContentList)
}
}

def lookupInCache[F[_]: Monad: Clock: RegistryLookup](
schemaKey: SchemaKey,
resolver: Resolver[F],
shredModelCache: ShredModelCache[F],
cached: ResolverResult.Cached[SchemaListKey, SchemaList]
cached: ResolverResult.Cached[SchemaKey, SchemaContentList]
) = {
val key = (schemaKey, cached.timestamp)
EitherT.liftF(shredModelCache.get(key)).flatMap {
case Some(model) =>
EitherT.pure[F, FailureDetails.LoaderIgluError](model)
case None =>
getShredModel(schemaKey, cached.value.schemas, resolver)
getShredModel[F](schemaKey, cached.value)
.semiflatTap(props => shredModelCache.put(key, props))
}
}
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object Dependencies {
object V {
// Scala (Loader)
val decline = "2.4.1"
val igluClient = "3.1.1"
val igluClient = "4.0.0-M2"
val igluCore = "1.1.1"
val badrows = "2.2.0"
val analyticsSdk = "3.1.0"
Expand Down

0 comments on commit 17f297a

Please sign in to comment.