From f63fa3d1fcc0180964b738b2b13b5ea9aa8ae2b9 Mon Sep 17 00:00:00 2001 From: spenes Date: Tue, 11 Oct 2022 01:30:51 +0300 Subject: [PATCH] Loader: add telemetry (close #617) --- .../aws/databricks.config.reference.hocon | 30 +++++ .../aws/redshift.config.reference.hocon | 30 +++++ .../aws/snowflake.config.reference.hocon | 30 +++++ .../gcp/snowflake.config.reference.hocon | 30 +++++ .../transformer/stream/common/Config.scala | 20 +--- .../transformer/stream/common/Resources.scala | 28 ++++- .../common/telemetry/Telemetry.scala | 107 +++++++++--------- .../snowplow/loader/databricks/Main.scala | 6 +- .../loader/databricks/DatabricksSpec.scala | 5 +- .../src/main/resources/application.conf | 8 ++ .../snowplow/rdbloader/Loader.scala | 12 +- .../snowplow/rdbloader/Runner.scala | 13 ++- .../snowplow/rdbloader/config/Config.scala | 14 ++- .../snowplow/rdbloader/dsl/Environment.scala | 37 +++++- .../snowplow/rdbloader/ConfigSpec.scala | 33 +++++- .../snowplow/rdbloader/SpecHelpers.scala | 3 +- .../snowplow/loader/redshift/Main.scala | 6 +- .../snowplow/loader/redshift/ConfigSpec.scala | 6 +- .../snowplow/loader/snowflake/Main.scala | 6 +- .../loader/snowflake/ConfigSpec.scala | 12 +- .../stream/kinesis/ConfigSpec.scala | 5 +- .../stream/pubsub/ConfigSpec.scala | 5 +- project/Dependencies.scala | 8 +- 23 files changed, 350 insertions(+), 104 deletions(-) rename modules/{common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream => common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader}/common/telemetry/Telemetry.scala (63%) diff --git a/config/loader/aws/databricks.config.reference.hocon b/config/loader/aws/databricks.config.reference.hocon index 4615189a2..31aa49370 100644 --- a/config/loader/aws/databricks.config.reference.hocon +++ b/config/loader/aws/databricks.config.reference.hocon @@ -227,4 +227,34 @@ # the next time it will get this (or anything) from a queue has this delay "sqsVisibility": "5 minutes" } + + # Optional. Configure telemetry + # All the fields are optional + "telemetry": { + # Set to true to disable telemetry + "disable": false + # Interval for the heartbeat event + "interval": 15 minutes + # HTTP method used to send the heartbeat event + "method": "POST" + # URI of the collector receiving the heartbeat event + "collectorUri": "collector-g.snowplowanalytics.com" + # Port of the collector receiving the heartbeat event + "collectorPort": 443 + # Whether to use https or not + "secure": true + # Identifier intended to tie events together across modules, + # infrastructure and apps when used consistently + "userProvidedId": "my_pipeline" + # ID automatically generated upon running a modules deployment script + # Intended to identify each independent module, and the infrastructure it controls + "autoGeneratedId": "hfy67e5ydhtrd" + # Unique identifier for the VM instance + # Unique for each instance of the app running within a module + "instanceId": "665bhft5u6udjf" + # Name of the terraform module that deployed the app + "moduleName": "rdb-loader-ce" + # Version of the terraform module that deployed the app + "moduleVersion": "1.0.0" + } } diff --git a/config/loader/aws/redshift.config.reference.hocon b/config/loader/aws/redshift.config.reference.hocon index 22706c102..9cc186094 100644 --- a/config/loader/aws/redshift.config.reference.hocon +++ b/config/loader/aws/redshift.config.reference.hocon @@ -204,4 +204,34 @@ # the next time it will get this (or anything) from a queue has this delay "sqsVisibility": "5 minutes" } + + # Optional. Configure telemetry + # All the fields are optional + "telemetry": { + # Set to true to disable telemetry + "disable": false + # Interval for the heartbeat event + "interval": 15 minutes + # HTTP method used to send the heartbeat event + "method": "POST" + # URI of the collector receiving the heartbeat event + "collectorUri": "collector-g.snowplowanalytics.com" + # Port of the collector receiving the heartbeat event + "collectorPort": 443 + # Whether to use https or not + "secure": true + # Identifier intended to tie events together across modules, + # infrastructure and apps when used consistently + "userProvidedId": "my_pipeline" + # ID automatically generated upon running a modules deployment script + # Intended to identify each independent module, and the infrastructure it controls + "autoGeneratedId": "hfy67e5ydhtrd" + # Unique identifier for the VM instance + # Unique for each instance of the app running within a module + "instanceId": "665bhft5u6udjf" + # Name of the terraform module that deployed the app + "moduleName": "rdb-loader-ce" + # Version of the terraform module that deployed the app + "moduleVersion": "1.0.0" + } } diff --git a/config/loader/aws/snowflake.config.reference.hocon b/config/loader/aws/snowflake.config.reference.hocon index 1c72d4082..5ad667cf0 100644 --- a/config/loader/aws/snowflake.config.reference.hocon +++ b/config/loader/aws/snowflake.config.reference.hocon @@ -242,4 +242,34 @@ # the next time it will get this (or anything) from a queue has this delay "sqsVisibility": "5 minutes" } + + # Optional. Configure telemetry + # All the fields are optional + "telemetry": { + # Set to true to disable telemetry + "disable": false + # Interval for the heartbeat event + "interval": 15 minutes + # HTTP method used to send the heartbeat event + "method": "POST" + # URI of the collector receiving the heartbeat event + "collectorUri": "collector-g.snowplowanalytics.com" + # Port of the collector receiving the heartbeat event + "collectorPort": 443 + # Whether to use https or not + "secure": true + # Identifier intended to tie events together across modules, + # infrastructure and apps when used consistently + "userProvidedId": "my_pipeline" + # ID automatically generated upon running a modules deployment script + # Intended to identify each independent module, and the infrastructure it controls + "autoGeneratedId": "hfy67e5ydhtrd" + # Unique identifier for the VM instance + # Unique for each instance of the app running within a module + "instanceId": "665bhft5u6udjf" + # Name of the terraform module that deployed the app + "moduleName": "rdb-loader-ce" + # Version of the terraform module that deployed the app + "moduleVersion": "1.0.0" + } } diff --git a/config/loader/gcp/snowflake.config.reference.hocon b/config/loader/gcp/snowflake.config.reference.hocon index 8e962baf3..5c77ab4e6 100644 --- a/config/loader/gcp/snowflake.config.reference.hocon +++ b/config/loader/gcp/snowflake.config.reference.hocon @@ -208,4 +208,34 @@ # before considering Snowflake unhealthy "nonLoading": "10 minutes" } + + # Optional. Configure telemetry + # All the fields are optional + "telemetry": { + # Set to true to disable telemetry + "disable": false + # Interval for the heartbeat event + "interval": 15 minutes + # HTTP method used to send the heartbeat event + "method": "POST" + # URI of the collector receiving the heartbeat event + "collectorUri": "collector-g.snowplowanalytics.com" + # Port of the collector receiving the heartbeat event + "collectorPort": 443 + # Whether to use https or not + "secure": true + # Identifier intended to tie events together across modules, + # infrastructure and apps when used consistently + "userProvidedId": "my_pipeline" + # ID automatically generated upon running a modules deployment script + # Intended to identify each independent module, and the infrastructure it controls + "autoGeneratedId": "hfy67e5ydhtrd" + # Unique identifier for the VM instance + # Unique for each instance of the app running within a module + "instanceId": "665bhft5u6udjf" + # Name of the terraform module that deployed the app + "moduleName": "rdb-loader-ce" + # Version of the terraform module that deployed the app + "moduleVersion": "1.0.0" + } } diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala index 4d5d4125a..ec51e704c 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala @@ -23,6 +23,7 @@ import io.circe.generic.semiauto._ import scala.concurrent.duration.{Duration, FiniteDuration} +import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry import com.snowplowanalytics.snowplow.rdbloader.common.config.{ConfigUtils, TransformerConfig} import com.snowplowanalytics.snowplow.rdbloader.common.config.implicits._ import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression @@ -35,7 +36,7 @@ final case class Config(input: Config.StreamInput, queue: Config.QueueConfig, formats: TransformerConfig.Formats, monitoring: Config.Monitoring, - telemetry: Config.Telemetry, + telemetry: Telemetry.Config, featureFlags: TransformerConfig.FeatureFlags, validations: TransformerConfig.Validations) @@ -155,23 +156,6 @@ object Config { deriveDecoder[MetricsReporters] } - case class Telemetry( - disable: Boolean, - interval: FiniteDuration, - method: String, - collectorUri: String, - collectorPort: Int, - secure: Boolean, - userProvidedId: Option[String], - autoGeneratedId: Option[String], - instanceId: Option[String], - moduleName: Option[String], - moduleVersion: Option[String] - ) - - implicit val telemetryDecoder: Decoder[Telemetry] = - deriveDecoder[Telemetry] - trait Decoders extends TransformerConfig.Decoders { implicit val streamInputConfigDecoder: Decoder[StreamInput] = diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Resources.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Resources.scala index 1cfbae518..c991c47df 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Resources.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Resources.scala @@ -24,14 +24,16 @@ import io.circe.Json import cats.implicits._ import cats.effect._ +import org.http4s.client.blaze.BlazeClientBuilder + import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.iglu.client.resolver.{InitListCache, InitSchemaCache, Resolver} import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{Queue, BlobStorage} +import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.metrics.Metrics -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.telemetry.Telemetry import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.Checkpointer case class Resources[F[_], C]( @@ -72,7 +74,16 @@ object Resources { instanceId <- mkTransformerInstanceId blocker <- Blocker[F] metrics <- Resource.eval(Metrics.build[F](blocker, config.monitoring.metrics)) - telemetry <- Telemetry.build[F](config, buildName, buildVersion, executionContext) + httpClient <- BlazeClientBuilder[F](executionContext).resource + telemetry <- Telemetry.build[F]( + config.telemetry, + buildName, + buildVersion, + httpClient, + AppId.appId, + getRegionFromConfig(config), + getCloudFromConfig(config) + ) inputStream <- mkSource(blocker, config.input, config.monitoring) blobStorage <- mkSink(blocker, config.output) } yield @@ -112,4 +123,17 @@ object Resources { .eval(Sync[F].delay(UUID.randomUUID())) .evalTap(id => logger.info(s"Instantiated $id shredder instance")) } + + private def getRegionFromConfig(config: Config): Option[String] = + config.input match { + case c: Config.StreamInput.Kinesis => Some(c.region.name) + case _ => None + } + + private def getCloudFromConfig(config: Config): Option[Telemetry.Cloud] = + config.input match { + case _: Config.StreamInput.Kinesis => Some(Telemetry.Cloud.Aws) + case _: Config.StreamInput.Pubsub => Some(Telemetry.Cloud.Gcp) + case _ => None + } } diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/telemetry/Telemetry.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/telemetry/Telemetry.scala similarity index 63% rename from modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/telemetry/Telemetry.scala rename to modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/telemetry/Telemetry.scala index f1829e055..5e8591a20 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/telemetry/Telemetry.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/telemetry/Telemetry.scala @@ -12,9 +12,9 @@ * See the Apache License Version 2.0 for the specific language governing permissions and * limitations there under. */ -package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.telemetry +package com.snowplowanalytics.snowplow.rdbloader.common.telemetry -import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -27,10 +27,9 @@ import cats.effect.{ConcurrentEffect, Resource, Sync, Timer} import fs2.Stream import org.http4s.client.{Client => HttpClient} -import org.http4s.client.blaze.BlazeClientBuilder -import io.circe.Json -import io.circe.Encoder +import io.circe._ +import io.circe.generic.semiauto._ import io.circe.syntax._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} @@ -40,9 +39,7 @@ import com.snowplowanalytics.snowplow.scalatracker.Emitter._ import com.snowplowanalytics.snowplow.scalatracker.Emitter.{Result => TrackerResult} import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Config.{Telemetry => TelemetryConfig} -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Config -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Config.StreamInput +import com.snowplowanalytics.snowplow.rdbloader.common.config.implicits._ trait Telemetry[F[_]] { def report: Stream[F, Unit] @@ -54,37 +51,40 @@ object Telemetry { Slf4jLogger.getLogger[F] def build[F[_]: ConcurrentEffect: Timer]( - config: Config, + telemetryConfig: Config, appName: String, appVersion: String, - executionContext: ExecutionContext + httpClient: HttpClient[F], + appGeneratedId: String, + region: Option[String], + cloud: Option[Cloud] ): Resource[F, Telemetry[F]] = - for { - httpClient <- BlazeClientBuilder[F](executionContext).resource - tracker <- initTracker(config.telemetry, appName, httpClient) - } yield new Telemetry[F] { - def report: Stream[F, Unit] = - if (config.telemetry.disable) { - Stream.empty.covary[F] - } else { - val sdj = makeHeartbeatEvent( - config.telemetry, - getRegionFromConfig(config), - getCloudFromConfig(config), - appName, - appVersion - ) - Stream - .fixedDelay[F](config.telemetry.interval) - .evalMap { _ => - tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> - tracker.flushEmitters() - } - } + initTracker(telemetryConfig, appName, httpClient).map { tracker => + new Telemetry[F] { + def report: Stream[F, Unit] = + if (telemetryConfig.disable) { + Stream.empty.covary[F] + } else { + val sdj = makeHeartbeatEvent( + telemetryConfig, + appGeneratedId, + region, + cloud, + appName, + appVersion + ) + Stream + .fixedDelay[F](telemetryConfig.interval) + .evalMap { _ => + tracker.trackSelfDescribingEvent(unstructEvent = sdj) >> + tracker.flushEmitters() + } + } + } } private def initTracker[F[_]: ConcurrentEffect: Timer]( - config: TelemetryConfig, + config: Config, appName: String, client: HttpClient[F] ): Resource[F, Tracker[F]] = @@ -106,17 +106,18 @@ object Telemetry { case TrackerResult.Success(_) => Logger[F].debug(s"Telemetry heartbeat successfully sent to ${params.getGetUri}") case TrackerResult.Failure(code) => - Logger[F].warn(s"Sending telemetry hearbeat got unexpected HTTP code $code from ${params.getUri}") + Logger[F].warn(s"Sending telemetry heartbeat got unexpected HTTP code $code from ${params.getUri}") case TrackerResult.TrackerFailure(exception) => Logger[F].warn( - s"Telemetry hearbeat failed to reach ${params.getUri} with following exception $exception after ${req.attempt} attempts" + s"Telemetry heartbeat failed to reach ${params.getUri} with following exception $exception after ${req.attempt} attempts" ) case TrackerResult.RetriesExceeded(failure) => Logger[F].error(s"Stopped trying to send telemetry heartbeat after following failure: $failure") } private def makeHeartbeatEvent( - teleCfg: TelemetryConfig, + teleCfg: Config, + appGeneratedId: String, region: Option[String], cloud: Option[Cloud], appName: String, @@ -130,7 +131,7 @@ object Telemetry { "moduleName" -> teleCfg.moduleName.asJson, "moduleVersion" -> teleCfg.moduleVersion.asJson, "instanceId" -> teleCfg.instanceId.asJson, - "appGeneratedId" -> com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId.appId.asJson, + "appGeneratedId" -> appGeneratedId.asJson, "cloud" -> cloud.asJson, "region" -> region.asJson, "applicationName" -> appName.asJson, @@ -138,25 +139,29 @@ object Telemetry { ) ) - private def getRegionFromConfig(config: Config): Option[String] = - config.input match { - case c: StreamInput.Kinesis => Some(c.region.name) - case _ => None - } - - private def getCloudFromConfig(config: Config): Option[Cloud] = - config.input match { - case _: StreamInput.Kinesis => Some(Cloud.Aws) - case _: StreamInput.Pubsub => Some(Cloud.Pubsub) - case _ => None - } - sealed trait Cloud object Cloud { case object Aws extends Cloud - case object Pubsub extends Cloud + case object Gcp extends Cloud implicit val encoder: Encoder[Cloud] = Encoder.encodeString.contramap[Cloud](_.toString.toUpperCase) } + + case class Config( + disable: Boolean, + interval: FiniteDuration, + method: String, + collectorUri: String, + collectorPort: Int, + secure: Boolean, + userProvidedId: Option[String], + autoGeneratedId: Option[String], + instanceId: Option[String], + moduleName: Option[String], + moduleVersion: Option[String] + ) + + implicit val telemetryDecoder: Decoder[Config] = + deriveDecoder[Config] } diff --git a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Main.scala b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Main.scala index f7a11d971..f5ab4c05d 100644 --- a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Main.scala +++ b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Main.scala @@ -19,5 +19,9 @@ import com.snowplowanalytics.snowplow.rdbloader.Runner object Main extends IOApp { def run(args: List[String]): IO[ExitCode] = - Runner.run[IO](args, Databricks.build) + Runner.run[IO]( + args, + Databricks.build, + "rdb-loader-databricks" + ) } diff --git a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala index 1d60227e9..7a7700096 100644 --- a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala +++ b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala @@ -23,6 +23,8 @@ import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnName, ColumnsToCopy, ColumnsToSkip} import com.snowplowanalytics.snowplow.rdbloader.db.{Statement, Target} import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService.LoadAuthMethod +import com.snowplowanalytics.snowplow.rdbloader.ConfigSpec._ + import scala.concurrent.duration.DurationInt import org.specs2.mutable.Specification @@ -143,7 +145,8 @@ object DatabricksSpec { Config.Retries(Config.Strategy.Constant, None, 1.minute, None), Config.Retries(Config.Strategy.Constant, None, 1.minute, None), Config.Retries(Config.Strategy.Constant, None, 1.minute, None), - Config.FeatureFlags(addLoadTstampColumn = true) + Config.FeatureFlags(addLoadTstampColumn = true), + exampleTelemetry )).right.get } diff --git a/modules/loader/src/main/resources/application.conf b/modules/loader/src/main/resources/application.conf index 9e8c70d20..61195f3dc 100644 --- a/modules/loader/src/main/resources/application.conf +++ b/modules/loader/src/main/resources/application.conf @@ -35,4 +35,12 @@ "featureFlags": { "addLoadTstampColumn": true } + "telemetry": { + "disable": false + "interval": 15 minutes + "method": POST + "collectorUri": collector-g.snowplowanalytics.com + "collectorPort": 443 + "secure": true + } } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala index b9fd20a26..f939e27d3 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala @@ -13,13 +13,17 @@ package com.snowplowanalytics.snowplow.rdbloader import scala.concurrent.duration._ + import cats.{Applicative, Apply, Monad} import cats.implicits._ import cats.effect.{Clock, Concurrent, ContextShift, MonadThrow, Timer} -import com.snowplowanalytics.snowplow.rdbloader.cloud.{JsonPathDiscovery, LoadAuthService} -import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} + import retry._ + import fs2.Stream + +import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns._ import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, HealthCheck, Manifest, Statement, Control => DbControl} @@ -28,6 +32,7 @@ import com.snowplowanalytics.snowplow.rdbloader.dsl.{Cache, DAO, FolderMonitorin import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring.AlertPayload import com.snowplowanalytics.snowplow.rdbloader.loading.{EventsTable, Load, Stage, TargetCheck, Retry} import com.snowplowanalytics.snowplow.rdbloader.loading.Retry._ +import com.snowplowanalytics.snowplow.rdbloader.cloud.{JsonPathDiscovery, LoadAuthService} import com.snowplowanalytics.snowplow.rdbloader.state.{Control, MakeBusy} object Loader { @@ -54,7 +59,7 @@ object Loader { * claim `A` is needed and `C[A]` later can be materialized into `F[A]` */ def run[F[_]: Transaction[*[_], C]: Concurrent: BlobStorage: Queue.Consumer: Clock: Iglu: Cache: Logging: Timer: Monitoring: ContextShift: LoadAuthService: JsonPathDiscovery, - C[_]: DAO: MonadThrow: Logging](config: Config[StorageTarget], control: Control[F]): F[Unit] = { + C[_]: DAO: MonadThrow: Logging](config: Config[StorageTarget], control: Control[F], telemetry: Telemetry[F]): F[Unit] = { val folderMonitoring: Stream[F, Unit] = FolderMonitoring.run[F, C](config.monitoring.folders, config.readyCheck, config.storage, control.isBusy) val noOpScheduling: Stream[F, Unit] = @@ -90,6 +95,7 @@ object Loader { .merge(healthCheck) .merge(stateLogging) .merge(periodicMetrics) + .merge(telemetry.report) } process diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Runner.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Runner.scala index 576abe04e..1cc94086f 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Runner.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Runner.scala @@ -29,17 +29,24 @@ import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig /** Generic starting point for all loaders */ object Runner { - def run[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel](argv: List[String], buildStatements: BuildTarget): F[ExitCode] = { + def run[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel](argv: List[String], + buildStatements: BuildTarget, + appName: String): F[ExitCode] = { val result = for { parsed <- CliConfig.parse[F](argv) statements <- EitherT.fromEither[F](buildStatements(parsed.config)) application = { - Environment.initialize[F](parsed, statements).use { env: Environment[F] => + Environment.initialize[F]( + parsed, + statements, + appName, + generated.BuildInfo.version + ).use { env: Environment[F] => import env._ Logging[F] .info(s"RDB Loader ${generated.BuildInfo.version} has started.") *> - Loader.run[F, ConnectionIO](parsed.config, env.controlF).as(ExitCode.Success) + Loader.run[F, ConnectionIO](parsed.config, env.controlF, env.telemetryF).as(ExitCode.Success) } } exitCode <- EitherT.liftF[F, String, ExitCode](application) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala index 2995fd3a8..26116b03c 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala @@ -13,18 +13,25 @@ package com.snowplowanalytics.snowplow.rdbloader.config import java.net.URI + import scala.concurrent.duration.{Duration, FiniteDuration} + import cats.effect.Sync import cats.data.EitherT import cats.syntax.either._ -import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage + import io.circe._ import io.circe.generic.semiauto._ + import org.http4s.{ParseFailure, Uri} + import cron4s.CronExpr import cron4s.circe._ -import com.snowplowanalytics.snowplow.rdbloader.config.Config._ + +import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import com.snowplowanalytics.snowplow.rdbloader.common.config.{ConfigUtils, Region} +import com.snowplowanalytics.snowplow.rdbloader.config.Config._ /** @@ -42,7 +49,8 @@ case class Config[+D <: StorageTarget]( retries: Retries, readyCheck: Retries, initRetries: Retries, - featureFlags: FeatureFlags + featureFlags: FeatureFlags, + telemetry: Telemetry.Config ) object Config { diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala index fe5f00ca5..c943e2c21 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala @@ -29,6 +29,7 @@ import io.sentry.{Sentry, SentryClient, SentryOptions} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue, SecretStore} import com.snowplowanalytics.snowplow.rdbloader.aws.{EC2ParameterStore, S3, SQS} import com.snowplowanalytics.snowplow.rdbloader.gcp.{GCS, Pubsub, SecretManager} @@ -55,7 +56,8 @@ class Environment[F[_]](cache: Cache[F], transaction: Transaction[F, ConnectionIO], target: Target, timeouts: Config.Timeouts, - control: Control[F]) { + control: Control[F], + telemetry: Telemetry[F]) { implicit val cacheF: Cache[F] = cache implicit val loggingF: Logging[F] = logging implicit val monitoringF: Monitoring[F] = monitoring @@ -69,6 +71,7 @@ class Environment[F[_]](cache: Cache[F], implicit val daoC: DAO[ConnectionIO] = DAO.connectionIO(target, timeouts) implicit val loggingC: Logging[ConnectionIO] = logging.mapK(transaction.arrowBack) val controlF: Control[F] = control + val telemetryF: Telemetry[F] = telemetry } object Environment { @@ -76,13 +79,18 @@ object Environment { private implicit val LoggerName = Logging.LoggerName(getClass.getSimpleName.stripSuffix("$")) + val appId = java.util.UUID.randomUUID.toString + case class CloudServices[F[_]](blobStorage: BlobStorage[F], queueConsumer: Queue.Consumer[F], loadAuthService: LoadAuthService[F], jsonPathDiscovery: JsonPathDiscovery[F], secretStore: SecretStore[F]) - def initialize[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel](cli: CliConfig, statementer: Target): Resource[F, Environment[F]] = + def initialize[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel](cli: CliConfig, + statementer: Target, + appName: String, + appVersion: String): Resource[F, Environment[F]] = for { blocker <- Blocker[F] implicit0(logger: Logger[F]) = Slf4jLogger.getLogger[F] @@ -104,6 +112,15 @@ object Environment { implicit0(secretStore: SecretStore[F]) = cloudServices.secretStore _ <- SSH.resource(cli.config.storage.sshTunnel) transaction <- Transaction.interpreter[F](cli.config.storage, blocker) + telemetry <- Telemetry.build[F]( + cli.config.telemetry, + appName, + appVersion, + httpClient, + appId, + getRegionForTelemetry(cli.config), + getCloudForTelemetry(cli.config) + ) } yield new Environment[F]( cache, logging, @@ -116,7 +133,8 @@ object Environment { transaction, statementer, cli.config.timeouts, - control + control, + telemetry ) def initSentry[F[_]: Logging: Sync](dsn: Option[URI]): Resource[F, Option[SentryClient]] = @@ -164,4 +182,17 @@ object Environment { secretStore <- SecretManager.secretManager[F] } yield CloudServices(blobStorage, queueConsumer, loadAuthService, jsonPathDiscovery, secretStore) } + + def getCloudForTelemetry(config: Config[_]): Option[Telemetry.Cloud] = + config.cloud match { + case _: Cloud.AWS => Telemetry.Cloud.Aws.some + case _: Cloud.GCP => Telemetry.Cloud.Gcp.some + } + + def getRegionForTelemetry(config: Config[_]): Option[String] = + config.cloud match { + case c: Cloud.AWS => c.region.name.some + case _ => None + } + } diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala index 18c22f134..d4a359b70 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala @@ -23,8 +23,8 @@ import cats.effect.IO import org.http4s.implicits._ +import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry import com.snowplowanalytics.snowplow.rdbloader.common.config.Region - import com.snowplowanalytics.snowplow.rdbloader.common.RegionSpec import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} @@ -136,6 +136,34 @@ object ConfigSpec { val exampleInitRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour)) val exampleFeatureFlags: Config.FeatureFlags = Config.FeatureFlags(addLoadTstampColumn = true) val exampleCloud: Config.Cloud = Config.Cloud.AWS(exampleRegion, exampleMessageQueue) + val exampleTelemetry = + Telemetry.Config( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + Some("my_pipeline"), + Some("hfy67e5ydhtrd"), + Some("665bhft5u6udjf"), + Some("rdb-loader-ce"), + Some("1.0.0") + ) + val defaultTelemetry = + Telemetry.Config( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + None, + None, + None, + None, + None + ) val exampleConfig = Config( exampleSnowflake, exampleCloud, @@ -147,7 +175,8 @@ object ConfigSpec { exampleRetries, exampleReadyCheck, exampleInitRetries, - exampleFeatureFlags + exampleFeatureFlags, + exampleTelemetry ) def getConfig[A](confPath: String, parse: String => EitherT[IO, String, A]): Either[String, A] = diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala index 72d497f49..4982b9366 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala @@ -40,7 +40,8 @@ object SpecHelpers { ConfigSpec.exampleRetries, ConfigSpec.exampleReadyCheck, ConfigSpec.exampleInitRetries, - ConfigSpec.exampleFeatureFlags + ConfigSpec.exampleFeatureFlags, + ConfigSpec.exampleTelemetry ) val validCliConfig: CliConfig = CliConfig(validConfig, false, resolverJson) diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Main.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Main.scala index 7c7417122..3f1fc7027 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Main.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Main.scala @@ -19,5 +19,9 @@ import com.snowplowanalytics.snowplow.rdbloader.Runner object Main extends IOApp { def run(args: List[String]): IO[ExitCode] = - Runner.run[IO](args, Redshift.build) + Runner.run[IO]( + args, + Redshift.build, + "rdb-loader-redshift" + ) } diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala index 96ad811ff..a41714fc1 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala @@ -38,7 +38,8 @@ class ConfigSpec extends Specification { exampleRetries, exampleReadyCheck, exampleInitRetries, - exampleFeatureFlags + exampleFeatureFlags, + exampleTelemetry ) result must beRight(expected) } @@ -56,7 +57,8 @@ class ConfigSpec extends Specification { exampleRetries.copy(cumulativeBound = None), exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds), exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)), - exampleFeatureFlags + exampleFeatureFlags, + defaultTelemetry ) result must beRight(expected) } diff --git a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Main.scala b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Main.scala index be18fc0dc..01c08c261 100644 --- a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Main.scala +++ b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Main.scala @@ -18,5 +18,9 @@ import com.snowplowanalytics.snowplow.rdbloader.Runner object Main extends IOApp { def run(args: List[String]): IO[ExitCode] = - Runner.run[IO](args, Snowflake.build) + Runner.run[IO]( + args, + Snowflake.build, + "rdb-loader-snowflake" + ) } diff --git a/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala b/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala index ed6edf59c..8609765f0 100644 --- a/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala +++ b/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala @@ -45,7 +45,8 @@ class ConfigSpec extends Specification { exampleRetries, exampleReadyCheck, exampleInitRetries, - exampleFeatureFlags + exampleFeatureFlags, + exampleTelemetry ) result must beRight(expected) } @@ -85,7 +86,8 @@ class ConfigSpec extends Specification { exampleRetries, exampleReadyCheck, exampleInitRetries, - exampleFeatureFlags + exampleFeatureFlags, + exampleTelemetry ) result must beRight(expected) } @@ -106,7 +108,8 @@ class ConfigSpec extends Specification { exampleRetries.copy(cumulativeBound = None), exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds), exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)), - exampleFeatureFlags + exampleFeatureFlags, + defaultTelemetry ) result must beRight(expected) } @@ -135,7 +138,8 @@ class ConfigSpec extends Specification { exampleRetries.copy(cumulativeBound = None), exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds), exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)), - exampleFeatureFlags + exampleFeatureFlags, + defaultTelemetry ) result must beRight(expected) } diff --git a/modules/transformer-kinesis/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/ConfigSpec.scala b/modules/transformer-kinesis/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/ConfigSpec.scala index a1ad47010..9589ac1ed 100644 --- a/modules/transformer-kinesis/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/ConfigSpec.scala +++ b/modules/transformer-kinesis/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kinesis/ConfigSpec.scala @@ -23,6 +23,7 @@ import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.iglu.core.SchemaCriterion +import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry import com.snowplowanalytics.snowplow.rdbloader.common.config.{Kinesis => AWSKinesis} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Validations import com.snowplowanalytics.snowplow.rdbloader.common.config.{Region, TransformerConfig} @@ -127,7 +128,7 @@ object ConfigSpec { Config.MetricsReporters(None, Some(Config.MetricsReporters.Stdout(1.minutes, None)), true) ) val exampleTelemetry = - Config.Telemetry( + Telemetry.Config( false, 15.minutes, "POST", @@ -141,7 +142,7 @@ object ConfigSpec { Some("1.0.0") ) val defaultTelemetry = - Config.Telemetry( + Telemetry.Config( false, 15.minutes, "POST", diff --git a/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala b/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala index a19c10934..b8d7f92ff 100644 --- a/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala +++ b/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala @@ -20,6 +20,7 @@ import scala.concurrent.duration._ import cats.effect.IO import com.snowplowanalytics.snowplow.badrows.Processor +import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Validations import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo @@ -103,7 +104,7 @@ object ConfigSpec { Config.MetricsReporters(None, Some(Config.MetricsReporters.Stdout(1.minutes, None)), true) ) val exampleTelemetry = - Config.Telemetry( + Telemetry.Config( false, 15.minutes, "POST", @@ -117,7 +118,7 @@ object ConfigSpec { Some("1.0.0") ) val defaultTelemetry = - Config.Telemetry( + Telemetry.Config( false, 15.minutes, "POST", diff --git a/project/Dependencies.scala b/project/Dependencies.scala index db3f0bde0..3b0c9760e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -203,7 +203,10 @@ object Dependencies { fs2, ssm, log4cats, - fs2BlobstoreCore + fs2BlobstoreCore, + scalaTracker, + scalaTrackerEmit, + http4sClient ) val loaderDependencies = Seq( @@ -265,15 +268,12 @@ object Dependencies { commons, kafkaClients, log4cats, - http4sClient, catsEffectLaws, circeOptics, parquet4s, hadoop, hadoopAws, parquetHadoop, - scalaTracker, - scalaTrackerEmit, specs2, specs2ScalaCheck, scalaCheck