From 8691ece9e920c5cb830d8d316c3f29bfa6bf1bc7 Mon Sep 17 00:00:00 2001 From: spenes Date: Fri, 20 Sep 2024 17:07:46 +0300 Subject: [PATCH] Bump iglu-scala-client to 4.0.0 --- .../rdbloader/common/SchemaProvider.scala | 41 ++++++-------- .../common/transformation/Transformed.scala | 54 +++++++------------ project/Dependencies.scala | 2 +- 3 files changed, 36 insertions(+), 61 deletions(-) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/SchemaProvider.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/SchemaProvider.scala index 7a4ab6db2..5bf5ad532 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/SchemaProvider.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/SchemaProvider.scala @@ -11,12 +11,12 @@ package com.snowplowanalytics.snowplow.rdbloader.common import cats.{Monad, Order} -import cats.data.EitherT +import cats.data.{EitherT, NonEmptyList} import cats.effect.Clock -import cats.syntax.all._ +import io.circe.Json import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.client.{ClientError, Resolver} -import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey} +import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingSchema} import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits.toSchema import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError @@ -40,32 +40,25 @@ object SchemaProvider { schema <- EitherT.fromOption[F](Schema.parse(json), parseSchemaBadRow(schemaKey)) } yield schema + def parseSchemaJsons[F[_]](jsons: NonEmptyList[SelfDescribingSchema[Json]]): Either[LoaderIgluError, NonEmptyList[SchemaWithKey]] = + jsons.traverse { json => + Schema + .parse(json.schema) + .toRight(parseSchemaBadRow(json.self.schemaKey)) + .map(schema => SchemaWithKey(json.self.schemaKey, schema)) + } + def fetchSchemasWithSameModel[F[_]: Clock: Monad: RegistryLookup]( resolver: Resolver[F], schemaKey: SchemaKey ): EitherT[F, LoaderIgluError, List[SchemaWithKey]] = - EitherT(resolver.listSchemasLike(schemaKey)) - .leftMap(resolverFetchBadRow(schemaKey.vendor, schemaKey.name, schemaKey.format, schemaKey.version.model)) - .map(_.schemas) - .flatMap(schemaKeys => - schemaKeys - .traverse(schemaKey => - getSchema(resolver, schemaKey) - .map(schema => SchemaWithKey(schemaKey, schema)) - ) - ) - - private def resolverFetchBadRow( - vendor: String, - name: String, - format: String, - model: Int - )( - e: ClientError.ResolutionError - ): LoaderIgluError = - LoaderIgluError.SchemaListNotFound(SchemaCriterion(vendor = vendor, name = name, format = format, model = model), e) + for { + jsons <- EitherT(resolver.lookupSchemasUntil(schemaKey)) + .leftMap(e => resolverBadRow(e.schemaKey)(e.error)) + schemas <- EitherT.fromEither[F](parseSchemaJsons(jsons)) + } yield schemas.toList - private def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): LoaderIgluError = + def resolverBadRow(schemaKey: SchemaKey)(e: ClientError.ResolutionError): LoaderIgluError = LoaderIgluError.IgluError(schemaKey, e) private def parseSchemaBadRow(schemaKey: SchemaKey): LoaderIgluError = diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala index 4c24fddf3..0979e1cfd 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala @@ -15,20 +15,19 @@ import cats.implicits._ import cats.data.{EitherT, NonEmptyList} import cats.effect.Clock import com.snowplowanalytics.iglu.client.Resolver +import com.snowplowanalytics.iglu.client.resolver.SchemaContentList import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.client.resolver.Resolver.{ResolverResult, SchemaListKey} -import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaList, SchemaMap, SelfDescribingSchema} +import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverResult +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SelfDescribingSchema} import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, FieldValue} import com.snowplowanalytics.iglu.schemaddl.redshift._ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.{BadRow, FailureDetails, Processor} import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError -import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError.SchemaListNotFound import com.snowplowanalytics.snowplow.rdbloader.common.Common.AtomicSchema import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.Shredded.ShreddedFormat import com.snowplowanalytics.snowplow.rdbloader.common.SchemaProvider -import com.snowplowanalytics.snowplow.rdbloader.common.SchemaProvider.SchemaWithKey /** Represents transformed data in blob storage */ @@ -151,25 +150,13 @@ object Transformed { def getShredModel[F[_]: Monad: Clock: RegistryLookup]( schemaKey: SchemaKey, - schemaKeys: List[SchemaKey], - resolver: Resolver[F] + schemaContentList: SchemaContentList ): EitherT[F, LoaderIgluError, ShredModel] = - schemaKeys - .traverse { sk => - SchemaProvider - .getSchema(resolver, sk) - .map(schema => SchemaWithKey(sk, schema)) - } - .flatMap { schemaWithKeyList => - EitherT - .fromOption[F][FailureDetails.LoaderIgluError, NonEmptyList[SchemaWithKey]]( - NonEmptyList.fromList(schemaWithKeyList), - FailureDetails.LoaderIgluError.InvalidSchema(schemaKey, s"Empty resolver response for $schemaKey") - ) - .map { nel => - val schemas = nel.map(swk => SelfDescribingSchema[Schema](SchemaMap(swk.schemaKey), swk.schema)) - foldMapRedshiftSchemas(schemas)(schemaKey) - } + EitherT + .fromEither[F](SchemaProvider.parseSchemaJsons(schemaContentList)) + .map { nel => + val schemas = nel.map(swk => SelfDescribingSchema[Schema](SchemaMap(swk.schemaKey), swk.schema)) + foldMapRedshiftSchemas(schemas)(schemaKey) } /** @@ -179,32 +166,27 @@ object Transformed { schemaKey: SchemaKey, shredModelCache: ShredModelCache[F], resolver: => Resolver[F] - ): EitherT[F, LoaderIgluError, ShredModel] = { - val criterion = SchemaCriterion(schemaKey.vendor, schemaKey.name, schemaKey.format, Some(schemaKey.version.model), None, None) - - EitherT(resolver.listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model, Some(schemaKey))) - .leftMap(error => SchemaListNotFound(criterion, error)) + ): EitherT[F, LoaderIgluError, ShredModel] = + EitherT(resolver.lookupSchemasUntilResult(schemaKey)) + .leftMap(e => SchemaProvider.resolverBadRow(e.schemaKey)(e.error)) .flatMap { - case cached: ResolverResult.Cached[SchemaListKey, SchemaList] => - lookupInCache(schemaKey, resolver, shredModelCache, cached) - case ResolverResult.NotCached(schemaList) => - val schemaKeys = schemaList.schemas - getShredModel(schemaKey, schemaKeys, resolver) + case cached: ResolverResult.Cached[SchemaKey, SchemaContentList] => + lookupInCache(schemaKey, shredModelCache, cached) + case ResolverResult.NotCached(schemaContentList) => + getShredModel(schemaKey, schemaContentList) } - } def lookupInCache[F[_]: Monad: Clock: RegistryLookup]( schemaKey: SchemaKey, - resolver: Resolver[F], shredModelCache: ShredModelCache[F], - cached: ResolverResult.Cached[SchemaListKey, SchemaList] + cached: ResolverResult.Cached[SchemaKey, SchemaContentList] ) = { val key = (schemaKey, cached.timestamp) EitherT.liftF(shredModelCache.get(key)).flatMap { case Some(model) => EitherT.pure[F, FailureDetails.LoaderIgluError](model) case None => - getShredModel(schemaKey, cached.value.schemas, resolver) + getShredModel[F](schemaKey, cached.value) .semiflatTap(props => shredModelCache.put(key, props)) } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4fa2e8385..936163085 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,7 +15,7 @@ object Dependencies { object V { // Scala (Loader) val decline = "2.4.1" - val igluClient = "3.1.1" + val igluClient = "4.0.0-M2" val igluCore = "1.1.1" val badrows = "2.2.0" val analyticsSdk = "3.1.0"