Skip to content

Commit cb6b75a

Browse files
jbeemsterbenjben
authored andcommitted
Add ability to disable CloudWatch metrics (closes #160)
1 parent 0170c9f commit cb6b75a

File tree

3 files changed

+19
-10
lines changed

3 files changed

+19
-10
lines changed

core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ object Config {
8686
region: String,
8787
appName: String,
8888
customEndpoint: Option[String],
89-
dynamodbCustomEndpoint: Option[String]
89+
dynamodbCustomEndpoint: Option[String],
90+
disableCloudWatch: Option[Boolean]
9091
) extends Queue {
9192
val timestampEither = initialTimestamp
9293
.toRight("An initial timestamp needs to be provided when choosing AT_TIMESTAMP")

core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KinesisSourceExecutor.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import com.amazonaws.services.kinesis.connectors.interfaces.IKinesisConnectorPip
3838
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
3939
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
4040
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory
41+
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory
4142

4243
// This project
4344
import com.snowplowanalytics.stream.loader.Config._
@@ -166,17 +167,20 @@ class KinesisSourceExecutor[A, B](
166167
"idleTimeBetweenReads is greater than bufferTimeMillisecondsLimit. For best results, ensure that bufferTimeMillisecondsLimit is more than or equal to idleTimeBetweenReads ")
167168
}
168169

169-
val workerBuilder = new Worker.Builder()
170-
.recordProcessorFactory(getKinesisConnectorRecordProcessorFactory())
171-
.config(kinesisClientLibConfiguration)
172-
173-
// If a metrics factory was specified, use it.
174-
if (metricFactory != null) {
175-
workerBuilder.metricsFactory(metricFactory)
170+
worker = kinesis.disableCloudWatch match {
171+
case Some(true) =>
172+
new Worker.Builder()
173+
.recordProcessorFactory(getKinesisConnectorRecordProcessorFactory())
174+
.config(kinesisClientLibConfiguration)
175+
.metricsFactory(new NullMetricsFactory())
176+
.build()
177+
case _ =>
178+
new Worker.Builder()
179+
.recordProcessorFactory(getKinesisConnectorRecordProcessorFactory())
180+
.config(kinesisClientLibConfiguration)
181+
.build()
176182
}
177183

178-
worker = workerBuilder.build()
179-
180184
LOG.info(getClass.getSimpleName + " worker created")
181185
}
182186

examples/config.hocon.sample

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ queue {
8888
# dynamodbCustomEndpoint = "http://localhost:4569"
8989
# dynamodbCustomEndpoint = ${?ENRICH_DYNAMODB_CUSTOM_ENDPOINT}
9090

91+
# Optional override to disable cloudwatch
92+
# disableCloudWatch = true
93+
# disableCloudWatch = ${?ENRICH_DISABLE_CLOUDWATCH}
94+
9195
# "appName" is used for a DynamoDB table to maintain stream state.
9296
# You can set it automatically using: "SnowplowElasticsearchSink-${sink.kinesis.in.stream-name}"
9397
appName = "{{kinesisAppName}}"

0 commit comments

Comments
 (0)