Skip to content

Commit

Permalink
Implicit region config reader
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 29, 2021
1 parent 1069842 commit dc1a49c
Show file tree
Hide file tree
Showing 15 changed files with 479 additions and 116 deletions.
7 changes: 4 additions & 3 deletions config/config.kinesis.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@
"type": "kinesis"
"streamName": "test-kinesis-stream"
"initialPosition": "LATEST"
"region": "eu-central-1"
}
"output": {
"good": {
"client": {
"endpoint": "localhost"
}
"cluster": {
"index": "good"
}
}
"bad" {
"type": "kinesis"
"streamName": "test-kinesis-bad-stream"
"region": "eu-central-1"
}
}
"purpose": "BAD_ROWS"
"purpose": "ENRICHED_EVENTS"
}
3 changes: 3 additions & 0 deletions config/config.nsq.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
"client": {
"endpoint": "localhost"
}
"cluster": {
"index": "good"
}
}
"bad" {
"type": "nsq"
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
"aws": {
"signing": false
}
"cluster": {
"index": "good"
}
}
}
"monitoring": {
Expand Down
180 changes: 101 additions & 79 deletions core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import com.monovore.decline.{Command, Opts}
import cats.syntax.either._
import cats.syntax.validated._

import com.amazonaws.regions.DefaultAwsRegionProviderChain
import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions}

import pureconfig.{CamelCase, ConfigFieldMapping, ConfigObjectSource, ConfigReader, ConfigSource}
import com.typesafe.config.ConfigOrigin

import pureconfig._
import pureconfig.generic.{FieldCoproductHint, ProductHint}
import pureconfig.generic.semiauto._
import pureconfig.error.{ConfigReaderFailures, FailureReason}
import pureconfig.error._

object Config {

Expand Down Expand Up @@ -68,7 +70,7 @@ object Config {
initialPosition: String,
initialTimestamp: Option[String],
maxRecords: Long,
region: Option[String],
region: Region,
appName: String,
customEndpoint: Option[String],
dynamodbCustomEndpoint: Option[String],
Expand All @@ -91,15 +93,6 @@ object Config {

object Kinesis {
final case class Buffer(byteLimit: Long, recordLimit: Long, timeLimit: Long)

implicit val sourceKinesisConfigReader: ConfigReader[Kinesis] =
deriveReader[Kinesis].emap { c =>
val region = c.region.orElse(getRegion)
region match {
case Some(_) => c.copy(region = region).asRight
case _ => RawFailureReason("Region isn't set in the Kinesis source").asLeft
}
}
}
}

Expand Down Expand Up @@ -131,17 +124,7 @@ object Config {
ssl: Boolean
)

final case class ESAWS(signing: Boolean, region: Option[String])
object ESAWS {
implicit val sinkGoodESAWSConfigReader: ConfigReader[ESAWS] =
deriveReader[ESAWS].emap { c =>
val region = c.region.orElse(getRegion)
if (c.signing && region.isEmpty)
RawFailureReason("Region needs to be set when AWS signing is true").asLeft
else
c.copy(region = region).asRight
}
}
final case class ESAWS(signing: Boolean, region: Region)

final case class ESCluster(index: String, documentType: Option[String])

Expand All @@ -163,19 +146,9 @@ object Config {

final case class Kinesis(
streamName: String,
region: Option[String],
region: Region,
customEndpoint: Option[String]
) extends BadSink
object Kinesis {
implicit val sinkBadKinesisConfigReader: ConfigReader[Kinesis] =
deriveReader[Kinesis].emap { c =>
val region = c.region.orElse(getRegion)
region match {
case Some(_) => c.copy(region = region).asRight
case _ => RawFailureReason("Region isn't set in the Kinesis sink").asLeft
}
}
}
}
}

Expand Down Expand Up @@ -208,48 +181,81 @@ object Config {
final case class Metrics(cloudWatch: Boolean)
}

final case class RawFailureReason(description: String) extends FailureReason
final case class Region(name: String)

implicit val streamLoaderConfigReader: ConfigReader[StreamLoaderConfig] =
deriveReader[StreamLoaderConfig]
implicit val sourceConfigReader: ConfigReader[Source] =
deriveReader[Source]
implicit val sourceStdinConfigReader: ConfigReader[Source.Stdin.type] =
deriveReader[Source.Stdin.type]
implicit val sourceNsqConfigReader: ConfigReader[Source.Nsq] =
deriveReader[Source.Nsq]
implicit val sourceNsqBufferConfigReader: ConfigReader[Source.Nsq.Buffer] =
deriveReader[Source.Nsq.Buffer]
implicit val sourceKinesisConfigBufferReader: ConfigReader[Source.Kinesis.Buffer] =
deriveReader[Source.Kinesis.Buffer]
implicit val sinkConfigReader: ConfigReader[Sink] =
deriveReader[Sink]
implicit val sinkGoodConfigReader: ConfigReader[Sink.GoodSink] =
deriveReader[Sink.GoodSink]
implicit val sinkGoodStdoutConfigReader: ConfigReader[Sink.GoodSink.Stdout.type] =
deriveReader[Sink.GoodSink.Stdout.type]
implicit val sinkGoodESConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch] =
deriveReader[Sink.GoodSink.Elasticsearch]
implicit val sinkGoodESClientConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESClient] =
deriveReader[Sink.GoodSink.Elasticsearch.ESClient]
implicit val sinkGoodESClusterConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESCluster] =
deriveReader[Sink.GoodSink.Elasticsearch.ESCluster]
implicit val sinkGoodESChunkConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESChunk] =
deriveReader[Sink.GoodSink.Elasticsearch.ESChunk]
implicit val sinkBadSinkConfigReader: ConfigReader[Sink.BadSink] =
deriveReader[Sink.BadSink]
implicit val sinkBadNoneConfigReader: ConfigReader[Sink.BadSink.None.type] =
deriveReader[Sink.BadSink.None.type]
implicit val sinkBadStderrConfigReader: ConfigReader[Sink.BadSink.Stderr.type] =
deriveReader[Sink.BadSink.Stderr.type]
implicit val sinkBadNsqConfigReader: ConfigReader[Sink.BadSink.Nsq] =
deriveReader[Sink.BadSink.Nsq]
implicit val monitoringConfigReader: ConfigReader[Monitoring] =
deriveReader[Monitoring]
implicit val snowplowMonitoringConfig: ConfigReader[Monitoring.SnowplowMonitoring] =
deriveReader[Monitoring.SnowplowMonitoring]
implicit val metricsConfigReader: ConfigReader[Monitoring.Metrics] =
deriveReader[Monitoring.Metrics]
final case class RawFailureReason(description: String) extends FailureReason
final case class RawConfigReaderFailure(description: String, origin: Option[ConfigOrigin] = None)
extends ConfigReaderFailure

case class implicits(
regionConfigReader: ConfigReader[Region] with ReadsMissingKeys = new ConfigReader[Region]
with ReadsMissingKeys {
override def from(cur: ConfigCursor) =
if (cur.isUndefined)
Config.getRegion.toRight(
ConfigReaderFailures(
RawConfigReaderFailure(
"Region can not be resolved, needs to be passed explicitly"
)
)
)
else
cur.asString.flatMap { r =>
val region = Region(r)
checkRegion(region).leftMap(e => ConfigReaderFailures(RawConfigReaderFailure(e)))
}
}
) {
implicit val implRegionConfigReader: ConfigReader[Region] = regionConfigReader
implicit val streamLoaderConfigReader: ConfigReader[StreamLoaderConfig] =
deriveReader[StreamLoaderConfig]
implicit val sourceConfigReader: ConfigReader[Source] =
deriveReader[Source]
implicit val sourceStdinConfigReader: ConfigReader[Source.Stdin.type] =
deriveReader[Source.Stdin.type]
implicit val sourceNsqConfigReader: ConfigReader[Source.Nsq] =
deriveReader[Source.Nsq]
implicit val sourceNsqBufferConfigReader: ConfigReader[Source.Nsq.Buffer] =
deriveReader[Source.Nsq.Buffer]
implicit val sourceKinesisConfigReader: ConfigReader[Source.Kinesis] =
deriveReader[Source.Kinesis]
implicit val sourceKinesisConfigBufferReader: ConfigReader[Source.Kinesis.Buffer] =
deriveReader[Source.Kinesis.Buffer]
implicit val sinkConfigReader: ConfigReader[Sink] =
deriveReader[Sink]
implicit val sinkGoodConfigReader: ConfigReader[Sink.GoodSink] =
deriveReader[Sink.GoodSink]
implicit val sinkGoodStdoutConfigReader: ConfigReader[Sink.GoodSink.Stdout.type] =
deriveReader[Sink.GoodSink.Stdout.type]
implicit val sinkGoodESConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch] =
deriveReader[Sink.GoodSink.Elasticsearch]
implicit val sinkGoodESClientConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESClient] =
deriveReader[Sink.GoodSink.Elasticsearch.ESClient]
implicit val sinkGoodESClusterConfigReader: ConfigReader[
Sink.GoodSink.Elasticsearch.ESCluster
] =
deriveReader[Sink.GoodSink.Elasticsearch.ESCluster]
implicit val sinkGoodESAWSConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESAWS] =
deriveReader[Sink.GoodSink.Elasticsearch.ESAWS]
implicit val sinkGoodESChunkConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESChunk] =
deriveReader[Sink.GoodSink.Elasticsearch.ESChunk]
implicit val sinkBadSinkConfigReader: ConfigReader[Sink.BadSink] =
deriveReader[Sink.BadSink]
implicit val sinkBadKinesisConfigReader: ConfigReader[Sink.BadSink.Kinesis] =
deriveReader[Sink.BadSink.Kinesis]
implicit val sinkBadNoneConfigReader: ConfigReader[Sink.BadSink.None.type] =
deriveReader[Sink.BadSink.None.type]
implicit val sinkBadStderrConfigReader: ConfigReader[Sink.BadSink.Stderr.type] =
deriveReader[Sink.BadSink.Stderr.type]
implicit val sinkBadNsqConfigReader: ConfigReader[Sink.BadSink.Nsq] =
deriveReader[Sink.BadSink.Nsq]
implicit val monitoringConfigReader: ConfigReader[Monitoring] =
deriveReader[Monitoring]
implicit val snowplowMonitoringConfig: ConfigReader[Monitoring.SnowplowMonitoring] =
deriveReader[Monitoring.SnowplowMonitoring]
implicit val metricsConfigReader: ConfigReader[Monitoring.Metrics] =
deriveReader[Monitoring.Metrics]
}

val config = Opts
.option[Path]("config", "Path to a HOCON configuration file")
Expand All @@ -263,7 +269,16 @@ object Config {

val command = Command("snowplow-stream-loader", generated.Settings.version, true)(config)

def parseConfig(arguments: Array[String]): Either[String, StreamLoaderConfig] =
def parseConfig(arguments: Array[String]): Either[String, StreamLoaderConfig] = {
val configImplicits = com.snowplowanalytics.stream.loader.Config.implicits()
parseConfig(arguments, configImplicits.streamLoaderConfigReader)
}

def parseConfig(
arguments: Array[String],
configReader: ConfigReader[StreamLoaderConfig]
): Either[String, StreamLoaderConfig] = {
implicit val implConfigReader: ConfigReader[StreamLoaderConfig] = configReader
for {
path <- command.parse(arguments).leftMap(_.toString)
source = path.fold(ConfigSource.empty)(ConfigSource.file)
Expand All @@ -272,6 +287,7 @@ object Config {
)
parsed <- c.load[StreamLoaderConfig].leftMap(showFailures)
} yield parsed
}

/** Optionally give precedence to configs wrapped in a "esloader" block. To help avoid polluting config namespace */
private def namespaced(configObjSource: ConfigObjectSource): ConfigObjectSource =
Expand All @@ -295,8 +311,14 @@ object Config {
failureStrings.mkString("\n")
}

private def getRegion: Option[String] =
Either.catchNonFatal((new DefaultAwsRegionProviderChain).getRegion).toOption
private def getRegion: Option[Region] =
Either.catchNonFatal((new DefaultAwsRegionProviderChain).getRegion).toOption.map(Region)

private def checkRegion(region: Region): Either[String, Region] = {
val allRegions = Regions.values().toList.map(_.getName)
if (allRegions.contains(region.name)) region.asRight
else s"Region ${region.name} is unknown, choose from [${allRegions.mkString(", ")}]".asLeft
}

// Used as an option prefix when reading system properties.
val Namespace = "snowplow"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ class KinesisSourceExecutor[A, B](
+ kcc.CONNECTOR_DESTINATION + ","
+ KinesisConnectorConfiguration.KINESIS_CONNECTOR_USER_AGENT
)
.withRegionName(kcc.REGION_NAME)
.condWith(kinesis.customEndpoint.isDefined, _.withKinesisEndpoint(kcc.KINESIS_ENDPOINT))
.condWith(
kinesis.dynamodbCustomEndpoint.isDefined,
_.withDynamoDBEndpoint(kcc.DYNAMODB_ENDPOINT)
)
.condWith(kinesis.region.isDefined, _.withRegionName(kcc.REGION_NAME))

timestamp
.filter(_ => initialPosition == "AT_TIMESTAMP")
Expand Down Expand Up @@ -120,7 +120,7 @@ class KinesisSourceExecutor[A, B](
props.setProperty(KinesisConnectorConfiguration.PROP_DYNAMODB_ENDPOINT, _)
)
// So that the region of the DynamoDB table is correct
kinesis.region.foreach(props.setProperty(KinesisConnectorConfiguration.PROP_REGION_NAME, _))
props.setProperty(KinesisConnectorConfiguration.PROP_REGION_NAME, kinesis.region.name)
props.setProperty(KinesisConnectorConfiguration.PROP_APP_NAME, kinesis.appName.trim)
props.setProperty(
KinesisConnectorConfiguration.PROP_INITIAL_POSITION_IN_STREAM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,10 @@ class KinesisSink(conf: KinesisSinkConfig) extends ISink {
val client = AmazonKinesisClientBuilder
.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain())
.optWith[String](conf.region, _.withRegion)
.optWith[(String, String)](
for {
e <- conf.customEndpoint
r <- conf.region
} yield (e, r),
b => { case (e, r) => b.withEndpointConfiguration(new EndpointConfiguration(e, r)) }
.withRegion(conf.region.name)
.optWith[String](
conf.customEndpoint,
b => e => b.withEndpointConfiguration(new EndpointConfiguration(e, conf.region.name))
)
.build()

Expand Down
49 changes: 49 additions & 0 deletions core/src/test/resources/config.test1.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"input": {
"type": "kinesis"
"streamName": "test-kinesis-stream"
"appName": "test-app-name"
"initialPosition": "LATEST"
"region": "ca-central-1"
"maxRecords": 2000
"buffer": {
"byteLimit": 201
"recordLimit": 202
"timeLimit": 203
}
}
"output": {
"type": "elasticsearch"
"good": {
"client": {
"endpoint": "localhost"
"maxTimeout": 205
"maxRetries": 7
"port": 9220
"ssl": true
}
"chunk": {
"byteLimit": 206
"recordLimit": 207
}
"aws": {
"signing": true
"region": "ca-central-1"
}
"cluster": {
"index": "testindex"
}
}
"bad" {
"type": "kinesis"
"streamName": "test-kinesis-bad-stream"
"region": "ca-central-1"
}
}
"purpose": "BAD_ROWS"
"monitoring": {
"metrics": {
"cloudWatch": false
}
}
}
Loading

0 comments on commit dc1a49c

Please sign in to comment.