Skip to content

Commit 0170c9f

Browse files
ZAYEC77benjben
authored andcommitted
Add custom endpoint overrides for Kinesis and DynamoDB (closes #157)
1 parent 003f24b commit 0170c9f

File tree

4 files changed

+30
-9
lines changed

4 files changed

+30
-9
lines changed

core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ object Config {
8484
initialTimestamp: Option[String],
8585
maxRecords: Long,
8686
region: String,
87-
appName: String)
88-
extends Queue {
87+
appName: String,
88+
customEndpoint: Option[String],
89+
dynamodbCustomEndpoint: Option[String]
90+
) extends Queue {
8991
val timestampEither = initialTimestamp
9092
.toRight("An initial timestamp needs to be provided when choosing AT_TIMESTAMP")
9193
.right
@@ -99,10 +101,15 @@ object Config {
99101

100102
val timestamp = timestampEither.right.toOption
101103

102-
val endpoint = region match {
104+
val endpoint = customEndpoint.getOrElse(region match {
103105
case cn @ "cn-north-1" => s"https://kinesis.$cn.amazonaws.com.cn"
104106
case _ => s"https://kinesis.$region.amazonaws.com"
105-
}
107+
})
108+
109+
val dynamodbEndpoint = dynamodbCustomEndpoint.getOrElse(region match {
110+
case cn @ "cn-north-1" => s"https://dynamodb.$cn.amazonaws.com.cn"
111+
case _ => s"https://dynamodb.$region.amazonaws.com"
112+
})
106113
}
107114
}
108115

core/src/main/scala/com.snowplowanalytics.stream/loader/EsLoaderBadRow.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ case class EsLoaderBadRow(line: String, errors: NonEmptyList[String]) {
4040
.withZone(DateTimeZone.UTC)
4141

4242
def toCompactJson =
43-
Json.obj(
44-
"line" -> line.asJson,
45-
"errors" -> errors.asJson,
46-
"failure_tstamp" -> getTstamp(tstamp, tstampFormat).asJson
47-
).noSpaces
43+
Json
44+
.obj(
45+
"line" -> line.asJson,
46+
"errors" -> errors.asJson,
47+
"failure_tstamp" -> getTstamp(tstamp, tstampFormat).asJson
48+
)
49+
.noSpaces
4850
}
4951

5052
object EsLoaderBadRow {

core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KinesisSourceExecutor.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class KinesisSourceExecutor[A, B](
6767
kcc.AWS_CREDENTIALS_PROVIDER,
6868
kcc.WORKER_ID)
6969
.withKinesisEndpoint(kcc.KINESIS_ENDPOINT)
70+
.withDynamoDBEndpoint(kcc.DYNAMODB_ENDPOINT)
7071
.withFailoverTimeMillis(kcc.FAILOVER_TIME)
7172
.withMaxRecords(kcc.MAX_RECORDS)
7273
.withIdleTimeBetweenReadsInMillis(kcc.IDLE_TIME_BETWEEN_READS)
@@ -101,6 +102,7 @@ class KinesisSourceExecutor[A, B](
101102
queue: Queue.Kinesis): KinesisConnectorConfiguration = {
102103
val props = new Properties
103104
props.setProperty(KinesisConnectorConfiguration.PROP_KINESIS_ENDPOINT, queue.endpoint)
105+
props.setProperty(KinesisConnectorConfiguration.PROP_DYNAMODB_ENDPOINT, queue.dynamodbEndpoint)
104106
props.setProperty(KinesisConnectorConfiguration.PROP_APP_NAME, queue.appName.trim)
105107
props.setProperty(
106108
KinesisConnectorConfiguration.PROP_INITIAL_POSITION_IN_STREAM,

examples/config.hocon.sample

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,16 @@ queue {
7878
# Region where the Kinesis stream is located
7979
region = "{{kinesisRegion}}"
8080

81+
# Optional endpoint url configuration to override aws kinesis endpoints,
82+
# this can be used to specify local endpoints when using localstack
83+
# customEndpoint = {{kinesisEndpoint}}
84+
# customEndpoint = ${?ENRICH_STREAMS_SOURCE_SINK_CUSTOM_ENDPOINT}
85+
86+
# Optional endpoint url configuration to override aws dyanomdb endpoints for Kinesis checkpoints lease table,
87+
# this can be used to specify local endpoints when using Localstack
88+
# dynamodbCustomEndpoint = "http://localhost:4569"
89+
# dynamodbCustomEndpoint = ${?ENRICH_DYNAMODB_CUSTOM_ENDPOINT}
90+
8191
# "appName" is used for a DynamoDB table to maintain stream state.
8292
# You can set it automatically using: "SnowplowElasticsearchSink-${sink.kinesis.in.stream-name}"
8393
appName = "{{kinesisAppName}}"

0 commit comments

Comments
 (0)