Skip to content

Commit d839460

Browse files
istreeterspenes
authored andcommitted
Common: Change http4s client backend to async-http-client (close #903)
1 parent d4ee811 commit d839460

File tree

6 files changed

+12
-20
lines changed

6 files changed

+12
-20
lines changed

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import cats.effect.{ContextShift, Blocker, Clock, Resource, Timer, ConcurrentEff
2222

2323
import doobie.ConnectionIO
2424

25-
import org.http4s.client.blaze.BlazeClientBuilder
25+
import org.http4s.client.asynchttpclient.AsyncHttpClient
2626

2727
import io.sentry.{SentryClient, Sentry, SentryOptions}
2828

@@ -76,7 +76,7 @@ object Environment {
7676

7777
for {
7878
blocker <- Blocker[F]
79-
httpClient <- BlazeClientBuilder[F](blocker.blockingContext).resource
79+
httpClient <- AsyncHttpClient.resource[F]()
8080
iglu <- Iglu.igluInterpreter(httpClient, cli.resolverConfig)
8181
implicit0(logging: Logging[F]) = Logging.loggingInterpreter[F](List(cli.config.storage.password.getUnencrypted, cli.config.storage.username))
8282
tracker <- Monitoring.initializeTracking[F](cli.config.monitoring, httpClient)

modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Main.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ object Main extends IOApp {
3535
case Right(cliConfig) =>
3636
Resources.mk[IO](
3737
cliConfig.igluConfig,
38-
cliConfig.config,
39-
executionContext
38+
cliConfig.config
4039
).use { resources =>
4140
logger[IO].info(s"Starting RDB Shredder with ${cliConfig.config} config") *>
4241
Processing.run[IO](resources, cliConfig.config)

modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Resources.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import java.util.UUID
1616
import java.net.URI
1717

1818
import scala.concurrent.duration.DurationInt
19-
import scala.concurrent.ExecutionContext
2019

2120
import org.typelevel.log4cats.slf4j.Slf4jLogger
2221

@@ -68,19 +67,17 @@ object Resources {
6867

6968
def mk[F[_]: ConcurrentEffect : ContextShift: Clock: InitSchemaCache: InitListCache: Timer](
7069
igluConfig: Json,
71-
config: TransformerConfig.Stream,
72-
executionContext: ExecutionContext
70+
config: TransformerConfig.Stream
7371
): Resource[F, Resources[F]] =
7472
for {
7573
awsQueue <- mkQueueFromConfig(config.queue)
76-
resources <- mk(igluConfig, config, awsQueue, executionContext)
74+
resources <- mk(igluConfig, config, awsQueue)
7775
} yield resources
7876

7977
def mk[F[_]: ConcurrentEffect : ContextShift: Clock: InitSchemaCache: InitListCache: Timer](
8078
igluConfig: Json,
8179
config: TransformerConfig.Stream,
82-
awsQueue: AWSQueue[F],
83-
executionContext: ExecutionContext
80+
awsQueue: AWSQueue[F]
8481
): Resource[F, Resources[F]] =
8582
for {
8683
client <- mkClient(igluConfig)
@@ -92,7 +89,7 @@ object Resources {
9289
global <- Resource.eval(Ref.of(0L))
9390
store <- Resource.eval(mkStore[F](blocker, config.output.path))
9491
metrics <- Resource.eval(Metrics.build[F](blocker, config.monitoring.metrics))
95-
telemetry <- Telemetry.build[F](config, BuildInfo.name, BuildInfo.version, executionContext)
92+
telemetry <- Telemetry.build[F](config, BuildInfo.name, BuildInfo.version)
9693
} yield
9794
Resources(
9895
client,

modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/telemetry/Telemetry.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
*/
1515
package com.snowplowanalytics.snowplow.rdbloader.transformer.telemetry
1616

17-
import scala.concurrent.ExecutionContext
18-
1917
import org.typelevel.log4cats.Logger
2018
import org.typelevel.log4cats.slf4j.Slf4jLogger
2119

@@ -27,7 +25,7 @@ import cats.effect.{ConcurrentEffect, Resource, Sync, Timer}
2725
import fs2.Stream
2826

2927
import org.http4s.client.{Client => HttpClient}
30-
import org.http4s.client.blaze.BlazeClientBuilder
28+
import org.http4s.client.asynchttpclient.AsyncHttpClient
3129

3230
import io.circe.Json
3331
import io.circe.Encoder
@@ -56,11 +54,10 @@ object Telemetry {
5654
def build[F[_]: ConcurrentEffect: Timer](
5755
config: TransformerConfig.Stream,
5856
appName: String,
59-
appVersion: String,
60-
executionContext: ExecutionContext
57+
appVersion: String
6158
): Resource[F, Telemetry[F]] =
6259
for {
63-
httpClient <- BlazeClientBuilder[F](executionContext).resource
60+
httpClient <- AsyncHttpClient.resource[F]()
6461
tracker <- initTracker(config.telemetry, appName, httpClient)
6562
} yield new Telemetry[F] {
6663
def report: Stream[F, Unit] =

modules/transformer-kinesis/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/processing/TestApplication.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ object TestApplication {
3939
Resources.mk[IO](
4040
cliConfig.igluConfig,
4141
cliConfig.config,
42-
queueFromDeferred(forCompletionMessage),
43-
concurrent.ExecutionContext.global
42+
queueFromDeferred(forCompletionMessage)
4443
)
4544
.use { resources =>
4645
logger[IO].info(s"Starting RDB Shredder with ${cliConfig.config} config") *>

project/Dependencies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ object Dependencies {
101101
val monocleMacro = "com.github.julien-truffaut" %% "monocle-macro" % V.monocle
102102
val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry
103103
val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats
104-
val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s
104+
val http4sClient = "org.http4s" %% "http4s-async-http-client" % V.http4s
105105
val scalaTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.scalaTracker
106106
val scalaTrackerEmit = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-http4s" % V.scalaTracker
107107

0 commit comments

Comments
 (0)