Skip to content

Commit

Permalink
Loader: add telemetry (close #617)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Oct 17, 2022
1 parent 3b7444a commit f63fa3d
Show file tree
Hide file tree
Showing 23 changed files with 350 additions and 104 deletions.
30 changes: 30 additions & 0 deletions config/loader/aws/databricks.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,34 @@
# the next time it will get this (or anything) from a queue has this delay
"sqsVisibility": "5 minutes"
}

# Optional. Configure telemetry
# All the fields are optional
"telemetry": {
# Set to true to disable telemetry
"disable": false
# Interval for the heartbeat event
"interval": 15 minutes
# HTTP method used to send the heartbeat event
"method": "POST"
# URI of the collector receiving the heartbeat event
"collectorUri": "collector-g.snowplowanalytics.com"
# Port of the collector receiving the heartbeat event
"collectorPort": 443
# Whether to use https or not
"secure": true
# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": "my_pipeline"
# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": "hfy67e5ydhtrd"
# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": "665bhft5u6udjf"
# Name of the terraform module that deployed the app
"moduleName": "rdb-loader-ce"
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}
}
30 changes: 30 additions & 0 deletions config/loader/aws/redshift.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,34 @@
# the next time it will get this (or anything) from a queue has this delay
"sqsVisibility": "5 minutes"
}

# Optional. Configure telemetry
# All the fields are optional
"telemetry": {
# Set to true to disable telemetry
"disable": false
# Interval for the heartbeat event
"interval": 15 minutes
# HTTP method used to send the heartbeat event
"method": "POST"
# URI of the collector receiving the heartbeat event
"collectorUri": "collector-g.snowplowanalytics.com"
# Port of the collector receiving the heartbeat event
"collectorPort": 443
# Whether to use https or not
"secure": true
# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": "my_pipeline"
# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": "hfy67e5ydhtrd"
# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": "665bhft5u6udjf"
# Name of the terraform module that deployed the app
"moduleName": "rdb-loader-ce"
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}
}
30 changes: 30 additions & 0 deletions config/loader/aws/snowflake.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,34 @@
# the next time it will get this (or anything) from a queue has this delay
"sqsVisibility": "5 minutes"
}

# Optional. Configure telemetry
# All the fields are optional
"telemetry": {
# Set to true to disable telemetry
"disable": false
# Interval for the heartbeat event
"interval": 15 minutes
# HTTP method used to send the heartbeat event
"method": "POST"
# URI of the collector receiving the heartbeat event
"collectorUri": "collector-g.snowplowanalytics.com"
# Port of the collector receiving the heartbeat event
"collectorPort": 443
# Whether to use https or not
"secure": true
# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": "my_pipeline"
# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": "hfy67e5ydhtrd"
# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": "665bhft5u6udjf"
# Name of the terraform module that deployed the app
"moduleName": "rdb-loader-ce"
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}
}
30 changes: 30 additions & 0 deletions config/loader/gcp/snowflake.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,34 @@
# before considering Snowflake unhealthy
"nonLoading": "10 minutes"
}

# Optional. Configure telemetry
# All the fields are optional
"telemetry": {
# Set to true to disable telemetry
"disable": false
# Interval for the heartbeat event
"interval": 15 minutes
# HTTP method used to send the heartbeat event
"method": "POST"
# URI of the collector receiving the heartbeat event
"collectorUri": "collector-g.snowplowanalytics.com"
# Port of the collector receiving the heartbeat event
"collectorPort": 443
# Whether to use https or not
"secure": true
# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": "my_pipeline"
# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": "hfy67e5ydhtrd"
# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": "665bhft5u6udjf"
# Name of the terraform module that deployed the app
"moduleName": "rdb-loader-ce"
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.circe.generic.semiauto._

import scala.concurrent.duration.{Duration, FiniteDuration}

import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.config.{ConfigUtils, TransformerConfig}
import com.snowplowanalytics.snowplow.rdbloader.common.config.implicits._
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
Expand All @@ -35,7 +36,7 @@ final case class Config(input: Config.StreamInput,
queue: Config.QueueConfig,
formats: TransformerConfig.Formats,
monitoring: Config.Monitoring,
telemetry: Config.Telemetry,
telemetry: Telemetry.Config,
featureFlags: TransformerConfig.FeatureFlags,
validations: TransformerConfig.Validations)

Expand Down Expand Up @@ -155,23 +156,6 @@ object Config {
deriveDecoder[MetricsReporters]
}

case class Telemetry(
disable: Boolean,
interval: FiniteDuration,
method: String,
collectorUri: String,
collectorPort: Int,
secure: Boolean,
userProvidedId: Option[String],
autoGeneratedId: Option[String],
instanceId: Option[String],
moduleName: Option[String],
moduleVersion: Option[String]
)

implicit val telemetryDecoder: Decoder[Telemetry] =
deriveDecoder[Telemetry]

trait Decoders extends TransformerConfig.Decoders {

implicit val streamInputConfigDecoder: Decoder[StreamInput] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import io.circe.Json
import cats.implicits._
import cats.effect._

import org.http4s.client.blaze.BlazeClientBuilder

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.{InitListCache, InitSchemaCache, Resolver}

import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{Queue, BlobStorage}
import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils

import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.metrics.Metrics
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.Checkpointer

case class Resources[F[_], C](
Expand Down Expand Up @@ -72,7 +74,16 @@ object Resources {
instanceId <- mkTransformerInstanceId
blocker <- Blocker[F]
metrics <- Resource.eval(Metrics.build[F](blocker, config.monitoring.metrics))
telemetry <- Telemetry.build[F](config, buildName, buildVersion, executionContext)
httpClient <- BlazeClientBuilder[F](executionContext).resource
telemetry <- Telemetry.build[F](
config.telemetry,
buildName,
buildVersion,
httpClient,
AppId.appId,
getRegionFromConfig(config),
getCloudFromConfig(config)
)
inputStream <- mkSource(blocker, config.input, config.monitoring)
blobStorage <- mkSink(blocker, config.output)
} yield
Expand Down Expand Up @@ -112,4 +123,17 @@ object Resources {
.eval(Sync[F].delay(UUID.randomUUID()))
.evalTap(id => logger.info(s"Instantiated $id shredder instance"))
}

private def getRegionFromConfig(config: Config): Option[String] =
config.input match {
case c: Config.StreamInput.Kinesis => Some(c.region.name)
case _ => None
}

private def getCloudFromConfig(config: Config): Option[Telemetry.Cloud] =
config.input match {
case _: Config.StreamInput.Kinesis => Some(Telemetry.Cloud.Aws)
case _: Config.StreamInput.Pubsub => Some(Telemetry.Cloud.Gcp)
case _ => None
}
}
Loading

0 comments on commit f63fa3d

Please sign in to comment.