From fcdb2c86429a0dbfd6bc09193fbaeb521ca84456 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 5 Nov 2020 14:28:11 +0000 Subject: [PATCH] Adds jaeger as a tracer --- build.sbt | 1 + .../Collector.scala | 17 ++- .../CollectorRoute.scala | 128 ++++++++++-------- .../CollectorService.scala | 33 +++-- .../model.scala | 10 +- project/Dependencies.scala | 2 + 6 files changed, 119 insertions(+), 72 deletions(-) diff --git a/build.sbt b/build.sbt index 5d53bbd41..322d17e2e 100644 --- a/build.sbt +++ b/build.sbt @@ -28,6 +28,7 @@ lazy val commonDependencies = Seq( Dependencies.Libraries.prometheusCommon, Dependencies.Libraries.opentracingApi, Dependencies.Libraries.opentracingNoop, + Dependencies.Libraries.jaeger, // Scala Dependencies.Libraries.scopt, Dependencies.Libraries.scalaz7, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala index 8af44f796..f40132338 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/Collector.scala @@ -24,6 +24,8 @@ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.sslconfig.akka.AkkaSSLConfig +import io.jaegertracing.{Configuration => JaegerConfiguration} +import io.opentracing.Tracer import io.opentracing.noop.NoopTracerFactory import org.slf4j.LoggerFactory import pureconfig._ @@ -40,7 +42,8 @@ trait Collector { lazy val log = LoggerFactory.getLogger(getClass()) implicit def hint[T] = ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase)) - implicit val _ = new FieldCoproductHint[SinkConfig]("enabled") + implicit val sinkHint = new FieldCoproductHint[SinkConfig]("enabled") + implicit val tracerHint = new FieldCoproductHint[TracerConfig]("enabled") def parseConfig(args: Array[String]): (CollectorConfig, Config) = { case class FileConfig(config: File = new File(".")) @@ -69,13 +72,23 @@ trait Collector { (loadConfigOrThrow[CollectorConfig](conf.getConfig("collector")), conf) } + def tracer(config: TracerConfig): Tracer = + config match { + case TracerConfig.Noop => + log.debug("Using noop tracer") + NoopTracerFactory.create + case TracerConfig.Jaeger => + log.debug("Using jaeger tracer") + JaegerConfiguration.fromEnv.getTracer + } + def run(collectorConf: CollectorConfig, akkaConf: Config, sinks: CollectorSinks): Unit = { implicit val system = ActorSystem.create("scala-stream-collector", akkaConf) implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher - val sharedTracer = NoopTracerFactory.create + val sharedTracer = tracer(collectorConf.tracer) val collectorRoute = new CollectorRoute { override def collectorService = new CollectorService(collectorConf, sinks, sharedTracer) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala index 7310b5e71..8f2804f91 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorRoute.scala @@ -16,7 +16,7 @@ package com.snowplowanalytics.snowplow.collectors.scalastream import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.HttpCookiePair -import akka.http.scaladsl.server.{Directive1, Route, StandardRoute} +import akka.http.scaladsl.server.{Directive1, Route} import akka.http.scaladsl.server.Directives._ import io.opentracing.{Span, Tracer} @@ -24,8 +24,6 @@ import io.opentracing.{Span, Tracer} import model.DntCookieMatcher import monitoring.BeanRegistry -import scala.concurrent.ExecutionContext - trait CollectorRoute { def collectorService: Service def tracer: Tracer @@ -47,32 +45,65 @@ trait CollectorRoute { complete(StatusCodes.NotFound -> "redirects disabled") } - def completeWithSpan(r: HttpResponse, span: Span): StandardRoute = + def traceRoute(inner: Span => Route): Route = requestContext => { - val fut = complete(r)(requestContext) + val span = tracer.buildSpan("HandleRequest").start + val fut = inner(span)(requestContext) fut.onComplete { _ => span.finish - }(ExecutionContext.global) + }(requestContext.executionContext) fut } + // Activates the span only for the local thread + def withActiveSpan[T](span: Span)(f: => T): T = { + val scope = tracer.activateSpan(span) + try { + f + } finally { + scope.close + } + } + def routes: Route = doNotTrack(collectorService.doNotTrackCookie) { dnt => - val span = tracer.buildSpan("CollectorRequest").start - cookieIfWanted(collectorService.cookieName) { reqCookie => - val cookie = reqCookie.map(_.toCookie) - headers { (userAgent, refererURI, rawRequestURI) => - val qs = queryString(rawRequestURI) - extractors { (host, ip, request) => - // get the adapter vendor and version from the path - path(Segment / Segment) { (vendor, version) => - val path = collectorService.determinePath(vendor, version) - post { - extractContentType { ct => - entity(as[String]) { body => + traceRoute { span => + cookieIfWanted(collectorService.cookieName) { reqCookie => + val cookie = reqCookie.map(_.toCookie) + headers { (userAgent, refererURI, rawRequestURI) => + val qs = queryString(rawRequestURI) + extractors { (host, ip, request) => + // get the adapter vendor and version from the path + path(Segment / Segment) { (vendor, version) => + val path = collectorService.determinePath(vendor, version) + post { + extractContentType { ct => + entity(as[String]) { body => + withActiveSpan(span) { + val (r, _) = collectorService.cookie( + qs, + Some(body), + path, + cookie, + userAgent, + refererURI, + host, + ip, + request, + pixelExpected = false, + doNotTrack = dnt, + Some(ct)) + incrementRequests(r.status) + complete(r) + } + } + } + } ~ + (get | head) { + withActiveSpan(span) { val (r, _) = collectorService.cookie( qs, - Some(body), + None, path, cookie, userAgent, @@ -80,47 +111,32 @@ trait CollectorRoute { host, ip, request, - pixelExpected = false, - doNotTrack = dnt, - Some(ct)) + pixelExpected = true, + doNotTrack = dnt) incrementRequests(r.status) - completeWithSpan(r, span) + complete(r) } } } ~ - (get | head) { - val (r, _) = collectorService.cookie( - qs, - None, - path, - cookie, - userAgent, - refererURI, - host, - ip, - request, - pixelExpected = true, - doNotTrack = dnt) - incrementRequests(r.status) - completeWithSpan(r, span) - } - } ~ - path("""ice\.png""".r | "i".r) { path => - (get | head) { - val (r, _) = collectorService.cookie( - qs, - None, - "/" + path, - cookie, - userAgent, - refererURI, - host, - ip, - request, - pixelExpected = true, - doNotTrack = dnt) - incrementRequests(r.status) - completeWithSpan(r, span) + path("""ice\.png""".r | "i".r) { path => + (get | head) { + withActiveSpan(span) { + val (r, _) = collectorService.cookie( + qs, + None, + "/" + path, + cookie, + userAgent, + refererURI, + host, + ip, + request, + pixelExpected = true, + doNotTrack = dnt) + incrementRequests(r.status) + complete(r) + } + } } } } diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala index af4ecc75c..215fecd2f 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/CollectorService.scala @@ -229,13 +229,16 @@ class CollectorService( ): List[Array[Byte]] = { // Split events into Good and Bad val eventSplit = SplitBatch.splitAndSerializePayload(event, sinks.good.MaxBytes) - // Send events to respective sinks - val span = tracer.buildSpan("SinkRawEvents").start - val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) - val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) - span.finish - // Sink Responses for Test Sink - sinkResponseGood ++ sinkResponseBad + val span = tracer.buildSpan("SinkRawEvents").start() + try { + // Send events to respective sinks + val sinkResponseGood = sinks.good.storeRawEvents(eventSplit.good, partitionKey) + val sinkResponseBad = sinks.bad.storeRawEvents(eventSplit.bad, partitionKey) + // Sink Responses for Test Sink + sinkResponseGood ++ sinkResponseBad + } finally { + span.finish + } } /** Builds the final http response from */ @@ -351,12 +354,16 @@ class CollectorService( case other => Some(other.toString) } - def tracerHeaders: Iterable[String] = { - val m = scala.collection.mutable.Map.empty[String, String] - val adapter = new TextMapAdapter(m.asJava) - tracer.inject(tracer.activeSpan.context, Format.Builtin.HTTP_HEADERS, adapter) - m.map { case (k, v) => s"$k: $v" } - } + def tracerHeaders: Iterable[String] = + Option(tracer.activeSpan) match { + case Some(span) => + val m = scala.collection.mutable.Map.empty[String, String] + val adapter = new TextMapAdapter(m.asJava) + tracer.inject(span.context, Format.Builtin.HTTP_HEADERS, adapter) + m.map { case (k, v) => s"$k: $v" } + case None => + Iterable() + } /** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */ def cacheControl(pixelExpected: Boolean): List[`Cache-Control`] = diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala index a95332ac1..f17561452 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/model.scala @@ -142,6 +142,13 @@ package model { redirect: Boolean = false, port: Int = 443 ) + + sealed trait TracerConfig + object TracerConfig { + case object Noop extends TracerConfig + case object Jaeger extends TracerConfig + } + final case class CollectorConfig( interface: String, port: Int, @@ -157,7 +164,8 @@ package model { streams: StreamsConfig, prometheusMetrics: PrometheusMetricsConfig, enableDefaultRedirect: Boolean = false, - ssl: SSLConfig = SSLConfig() + ssl: SSLConfig = SSLConfig(), + tracer: TracerConfig = TracerConfig.Noop ) { val cookieConfig = if (cookie.enabled) Some(cookie) else None val doNotTrackHttpCookie = diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 3ef6cf1aa..19f3923fe 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -41,6 +41,7 @@ object Dependencies { val commonsCodec = "1.13" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val grpcCore = "1.31.0" // force this version of lib from dependencies to mitigate secutiry vulnerabilities, TODO: update underlying libraries val opentracing = "0.33.0" + val jaeger = "1.4.0" // Scala val collectorPayload = "0.0.0" val scalaz7 = "7.0.9" @@ -74,6 +75,7 @@ object Dependencies { val cbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.cbor val opentracingApi = "io.opentracing" % "opentracing-api" % V.opentracing val opentracingNoop = "io.opentracing" % "opentracing-noop" % V.opentracing + val jaeger = "io.jaegertracing" % "jaeger-client" % V.jaeger val retry = "com.softwaremill.retry" %% "retry" % V.retry // Scala