diff --git a/config/config.kinesis.minimal.hocon b/config/config.kinesis.minimal.hocon index 5b44846..3567cc4 100644 --- a/config/config.kinesis.minimal.hocon +++ b/config/config.kinesis.minimal.hocon @@ -4,19 +4,20 @@ "type": "kinesis" "streamName": "test-kinesis-stream" "initialPosition": "LATEST" - "region": "eu-central-1" } "output": { "good": { "client": { "endpoint": "localhost" } + "cluster": { + "index": "good" + } } "bad" { "type": "kinesis" "streamName": "test-kinesis-bad-stream" - "region": "eu-central-1" } } - "purpose": "BAD_ROWS" + "purpose": "ENRICHED_EVENTS" } diff --git a/config/config.nsq.minimal.hocon b/config/config.nsq.minimal.hocon index b747669..3b4da36 100644 --- a/config/config.nsq.minimal.hocon +++ b/config/config.nsq.minimal.hocon @@ -12,6 +12,9 @@ "client": { "endpoint": "localhost" } + "cluster": { + "index": "good" + } } "bad" { "type": "nsq" diff --git a/core/src/main/resources/application.conf b/core/src/main/resources/application.conf index 4cdf0f6..2f1869c 100644 --- a/core/src/main/resources/application.conf +++ b/core/src/main/resources/application.conf @@ -24,9 +24,6 @@ "aws": { "signing": false } - "cluster": { - "index": "good" - } } } "monitoring": { diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala index 848d24c..e50e379 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala @@ -28,12 +28,14 @@ import com.monovore.decline.{Command, Opts} import cats.syntax.either._ import cats.syntax.validated._ -import com.amazonaws.regions.DefaultAwsRegionProviderChain +import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions} -import pureconfig.{CamelCase, ConfigFieldMapping, ConfigObjectSource, ConfigReader, ConfigSource} +import com.typesafe.config.ConfigOrigin + +import pureconfig._ import pureconfig.generic.{FieldCoproductHint, ProductHint} import pureconfig.generic.semiauto._ -import pureconfig.error.{ConfigReaderFailures, FailureReason} +import pureconfig.error._ object Config { @@ -68,7 +70,7 @@ object Config { initialPosition: String, initialTimestamp: Option[String], maxRecords: Long, - region: Option[String], + region: Region, appName: String, customEndpoint: Option[String], dynamodbCustomEndpoint: Option[String], @@ -91,15 +93,6 @@ object Config { object Kinesis { final case class Buffer(byteLimit: Long, recordLimit: Long, timeLimit: Long) - - implicit val sourceKinesisConfigReader: ConfigReader[Kinesis] = - deriveReader[Kinesis].emap { c => - val region = c.region.orElse(getRegion) - region match { - case Some(_) => c.copy(region = region).asRight - case _ => RawFailureReason("Region isn't set in the Kinesis source").asLeft - } - } } } @@ -131,17 +124,7 @@ object Config { ssl: Boolean ) - final case class ESAWS(signing: Boolean, region: Option[String]) - object ESAWS { - implicit val sinkGoodESAWSConfigReader: ConfigReader[ESAWS] = - deriveReader[ESAWS].emap { c => - val region = c.region.orElse(getRegion) - if (c.signing && region.isEmpty) - RawFailureReason("Region needs to be set when AWS signing is true").asLeft - else - c.copy(region = region).asRight - } - } + final case class ESAWS(signing: Boolean, region: Region) final case class ESCluster(index: String, documentType: Option[String]) @@ -163,19 +146,9 @@ object Config { final case class Kinesis( streamName: String, - region: Option[String], + region: Region, customEndpoint: Option[String] ) extends BadSink - object Kinesis { - implicit val sinkBadKinesisConfigReader: ConfigReader[Kinesis] = - deriveReader[Kinesis].emap { c => - val region = c.region.orElse(getRegion) - region match { - case Some(_) => c.copy(region = region).asRight - case _ => RawFailureReason("Region isn't set in the Kinesis sink").asLeft - } - } - } } } @@ -208,48 +181,81 @@ object Config { final case class Metrics(cloudWatch: Boolean) } - final case class RawFailureReason(description: String) extends FailureReason + final case class Region(name: String) - implicit val streamLoaderConfigReader: ConfigReader[StreamLoaderConfig] = - deriveReader[StreamLoaderConfig] - implicit val sourceConfigReader: ConfigReader[Source] = - deriveReader[Source] - implicit val sourceStdinConfigReader: ConfigReader[Source.Stdin.type] = - deriveReader[Source.Stdin.type] - implicit val sourceNsqConfigReader: ConfigReader[Source.Nsq] = - deriveReader[Source.Nsq] - implicit val sourceNsqBufferConfigReader: ConfigReader[Source.Nsq.Buffer] = - deriveReader[Source.Nsq.Buffer] - implicit val sourceKinesisConfigBufferReader: ConfigReader[Source.Kinesis.Buffer] = - deriveReader[Source.Kinesis.Buffer] - implicit val sinkConfigReader: ConfigReader[Sink] = - deriveReader[Sink] - implicit val sinkGoodConfigReader: ConfigReader[Sink.GoodSink] = - deriveReader[Sink.GoodSink] - implicit val sinkGoodStdoutConfigReader: ConfigReader[Sink.GoodSink.Stdout.type] = - deriveReader[Sink.GoodSink.Stdout.type] - implicit val sinkGoodESConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch] = - deriveReader[Sink.GoodSink.Elasticsearch] - implicit val sinkGoodESClientConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESClient] = - deriveReader[Sink.GoodSink.Elasticsearch.ESClient] - implicit val sinkGoodESClusterConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESCluster] = - deriveReader[Sink.GoodSink.Elasticsearch.ESCluster] - implicit val sinkGoodESChunkConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESChunk] = - deriveReader[Sink.GoodSink.Elasticsearch.ESChunk] - implicit val sinkBadSinkConfigReader: ConfigReader[Sink.BadSink] = - deriveReader[Sink.BadSink] - implicit val sinkBadNoneConfigReader: ConfigReader[Sink.BadSink.None.type] = - deriveReader[Sink.BadSink.None.type] - implicit val sinkBadStderrConfigReader: ConfigReader[Sink.BadSink.Stderr.type] = - deriveReader[Sink.BadSink.Stderr.type] - implicit val sinkBadNsqConfigReader: ConfigReader[Sink.BadSink.Nsq] = - deriveReader[Sink.BadSink.Nsq] - implicit val monitoringConfigReader: ConfigReader[Monitoring] = - deriveReader[Monitoring] - implicit val snowplowMonitoringConfig: ConfigReader[Monitoring.SnowplowMonitoring] = - deriveReader[Monitoring.SnowplowMonitoring] - implicit val metricsConfigReader: ConfigReader[Monitoring.Metrics] = - deriveReader[Monitoring.Metrics] + final case class RawFailureReason(description: String) extends FailureReason + final case class RawConfigReaderFailure(description: String, origin: Option[ConfigOrigin] = None) + extends ConfigReaderFailure + + case class implicits( + regionConfigReader: ConfigReader[Region] with ReadsMissingKeys = new ConfigReader[Region] + with ReadsMissingKeys { + override def from(cur: ConfigCursor) = + if (cur.isUndefined) + Config.getRegion.toRight( + ConfigReaderFailures( + RawConfigReaderFailure( + "Region can not be resolved, needs to be passed explicitly" + ) + ) + ) + else + cur.asString.flatMap { r => + val region = Region(r) + checkRegion(region).leftMap(e => ConfigReaderFailures(RawConfigReaderFailure(e))) + } + } + ) { + implicit val implRegionConfigReader: ConfigReader[Region] = regionConfigReader + implicit val streamLoaderConfigReader: ConfigReader[StreamLoaderConfig] = + deriveReader[StreamLoaderConfig] + implicit val sourceConfigReader: ConfigReader[Source] = + deriveReader[Source] + implicit val sourceStdinConfigReader: ConfigReader[Source.Stdin.type] = + deriveReader[Source.Stdin.type] + implicit val sourceNsqConfigReader: ConfigReader[Source.Nsq] = + deriveReader[Source.Nsq] + implicit val sourceNsqBufferConfigReader: ConfigReader[Source.Nsq.Buffer] = + deriveReader[Source.Nsq.Buffer] + implicit val sourceKinesisConfigReader: ConfigReader[Source.Kinesis] = + deriveReader[Source.Kinesis] + implicit val sourceKinesisConfigBufferReader: ConfigReader[Source.Kinesis.Buffer] = + deriveReader[Source.Kinesis.Buffer] + implicit val sinkConfigReader: ConfigReader[Sink] = + deriveReader[Sink] + implicit val sinkGoodConfigReader: ConfigReader[Sink.GoodSink] = + deriveReader[Sink.GoodSink] + implicit val sinkGoodStdoutConfigReader: ConfigReader[Sink.GoodSink.Stdout.type] = + deriveReader[Sink.GoodSink.Stdout.type] + implicit val sinkGoodESConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch] = + deriveReader[Sink.GoodSink.Elasticsearch] + implicit val sinkGoodESClientConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESClient] = + deriveReader[Sink.GoodSink.Elasticsearch.ESClient] + implicit val sinkGoodESClusterConfigReader: ConfigReader[ + Sink.GoodSink.Elasticsearch.ESCluster + ] = + deriveReader[Sink.GoodSink.Elasticsearch.ESCluster] + implicit val sinkGoodESAWSConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESAWS] = + deriveReader[Sink.GoodSink.Elasticsearch.ESAWS] + implicit val sinkGoodESChunkConfigReader: ConfigReader[Sink.GoodSink.Elasticsearch.ESChunk] = + deriveReader[Sink.GoodSink.Elasticsearch.ESChunk] + implicit val sinkBadSinkConfigReader: ConfigReader[Sink.BadSink] = + deriveReader[Sink.BadSink] + implicit val sinkBadKinesisConfigReader: ConfigReader[Sink.BadSink.Kinesis] = + deriveReader[Sink.BadSink.Kinesis] + implicit val sinkBadNoneConfigReader: ConfigReader[Sink.BadSink.None.type] = + deriveReader[Sink.BadSink.None.type] + implicit val sinkBadStderrConfigReader: ConfigReader[Sink.BadSink.Stderr.type] = + deriveReader[Sink.BadSink.Stderr.type] + implicit val sinkBadNsqConfigReader: ConfigReader[Sink.BadSink.Nsq] = + deriveReader[Sink.BadSink.Nsq] + implicit val monitoringConfigReader: ConfigReader[Monitoring] = + deriveReader[Monitoring] + implicit val snowplowMonitoringConfig: ConfigReader[Monitoring.SnowplowMonitoring] = + deriveReader[Monitoring.SnowplowMonitoring] + implicit val metricsConfigReader: ConfigReader[Monitoring.Metrics] = + deriveReader[Monitoring.Metrics] + } val config = Opts .option[Path]("config", "Path to a HOCON configuration file") @@ -263,7 +269,16 @@ object Config { val command = Command("snowplow-stream-loader", generated.Settings.version, true)(config) - def parseConfig(arguments: Array[String]): Either[String, StreamLoaderConfig] = + def parseConfig(arguments: Array[String]): Either[String, StreamLoaderConfig] = { + val configImplicits = com.snowplowanalytics.stream.loader.Config.implicits() + parseConfig(arguments, configImplicits.streamLoaderConfigReader) + } + + def parseConfig( + arguments: Array[String], + configReader: ConfigReader[StreamLoaderConfig] + ): Either[String, StreamLoaderConfig] = { + implicit val implConfigReader: ConfigReader[StreamLoaderConfig] = configReader for { path <- command.parse(arguments).leftMap(_.toString) source = path.fold(ConfigSource.empty)(ConfigSource.file) @@ -272,6 +287,7 @@ object Config { ) parsed <- c.load[StreamLoaderConfig].leftMap(showFailures) } yield parsed + } /** Optionally give precedence to configs wrapped in a "esloader" block. To help avoid polluting config namespace */ private def namespaced(configObjSource: ConfigObjectSource): ConfigObjectSource = @@ -295,8 +311,14 @@ object Config { failureStrings.mkString("\n") } - private def getRegion: Option[String] = - Either.catchNonFatal((new DefaultAwsRegionProviderChain).getRegion).toOption + private def getRegion: Option[Region] = + Either.catchNonFatal((new DefaultAwsRegionProviderChain).getRegion).toOption.map(Region) + + private def checkRegion(region: Region): Either[String, Region] = { + val allRegions = Regions.values().toList.map(_.getName) + if (allRegions.contains(region.name)) region.asRight + else s"Region ${region.name} is unknown, choose from [${allRegions.mkString(", ")}]".asLeft + } // Used as an option prefix when reading system properties. val Namespace = "snowplow" diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KinesisSourceExecutor.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KinesisSourceExecutor.scala index 478fd88..876c2d5 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KinesisSourceExecutor.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KinesisSourceExecutor.scala @@ -87,12 +87,12 @@ class KinesisSourceExecutor[A, B]( + kcc.CONNECTOR_DESTINATION + "," + KinesisConnectorConfiguration.KINESIS_CONNECTOR_USER_AGENT ) + .withRegionName(kcc.REGION_NAME) .condWith(kinesis.customEndpoint.isDefined, _.withKinesisEndpoint(kcc.KINESIS_ENDPOINT)) .condWith( kinesis.dynamodbCustomEndpoint.isDefined, _.withDynamoDBEndpoint(kcc.DYNAMODB_ENDPOINT) ) - .condWith(kinesis.region.isDefined, _.withRegionName(kcc.REGION_NAME)) timestamp .filter(_ => initialPosition == "AT_TIMESTAMP") @@ -120,7 +120,7 @@ class KinesisSourceExecutor[A, B]( props.setProperty(KinesisConnectorConfiguration.PROP_DYNAMODB_ENDPOINT, _) ) // So that the region of the DynamoDB table is correct - kinesis.region.foreach(props.setProperty(KinesisConnectorConfiguration.PROP_REGION_NAME, _)) + props.setProperty(KinesisConnectorConfiguration.PROP_REGION_NAME, kinesis.region.name) props.setProperty(KinesisConnectorConfiguration.PROP_APP_NAME, kinesis.appName.trim) props.setProperty( KinesisConnectorConfiguration.PROP_INITIAL_POSITION_IN_STREAM, diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KinesisSink.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KinesisSink.scala index 94d8753..34319db 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KinesisSink.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KinesisSink.scala @@ -54,13 +54,10 @@ class KinesisSink(conf: KinesisSinkConfig) extends ISink { val client = AmazonKinesisClientBuilder .standard() .withCredentials(new DefaultAWSCredentialsProviderChain()) - .optWith[String](conf.region, _.withRegion) - .optWith[(String, String)]( - for { - e <- conf.customEndpoint - r <- conf.region - } yield (e, r), - b => { case (e, r) => b.withEndpointConfiguration(new EndpointConfiguration(e, r)) } + .withRegion(conf.region.name) + .optWith[String]( + conf.customEndpoint, + b => e => b.withEndpointConfiguration(new EndpointConfiguration(e, conf.region.name)) ) .build() diff --git a/core/src/test/resources/config.test1.hocon b/core/src/test/resources/config.test1.hocon new file mode 100644 index 0000000..f054438 --- /dev/null +++ b/core/src/test/resources/config.test1.hocon @@ -0,0 +1,49 @@ +{ + "input": { + "type": "kinesis" + "streamName": "test-kinesis-stream" + "appName": "test-app-name" + "initialPosition": "LATEST" + "region": "ca-central-1" + "maxRecords": 2000 + "buffer": { + "byteLimit": 201 + "recordLimit": 202 + "timeLimit": 203 + } + } + "output": { + "type": "elasticsearch" + "good": { + "client": { + "endpoint": "localhost" + "maxTimeout": 205 + "maxRetries": 7 + "port": 9220 + "ssl": true + } + "chunk": { + "byteLimit": 206 + "recordLimit": 207 + } + "aws": { + "signing": true + "region": "ca-central-1" + } + "cluster": { + "index": "testindex" + } + } + "bad" { + "type": "kinesis" + "streamName": "test-kinesis-bad-stream" + "region": "ca-central-1" + } + } + "purpose": "BAD_ROWS" + "monitoring": { + "metrics": { + "cloudWatch": false + } + } +} diff --git a/core/src/test/resources/config.test2.hocon b/core/src/test/resources/config.test2.hocon new file mode 100644 index 0000000..094ae62 --- /dev/null +++ b/core/src/test/resources/config.test2.hocon @@ -0,0 +1,23 @@ +{ + "input": { + "type": "kinesis" + "streamName": "test-kinesis-stream" + "region": "ca-central-1" + "initialPosition": "LATEST" + } + "output": { + "good": { + "client": { + "endpoint": "localhost" + } + "cluster": { + "index": "good" + } + } + "bad" { + "type": "kinesis" + "streamName": "test-kinesis-bad-stream" + } + } + "purpose": "ENRICHED_EVENTS" +} diff --git a/core/src/test/resources/config.test3.hocon b/core/src/test/resources/config.test3.hocon new file mode 100644 index 0000000..0d48513 --- /dev/null +++ b/core/src/test/resources/config.test3.hocon @@ -0,0 +1,28 @@ +{ + "input": { + "type": "kinesis" + "streamName": "test-kinesis-stream" + "region": "ca-central-1" + "initialPosition": "LATEST" + } + "output": { + "good": { + "client": { + "endpoint": "localhost" + } + "cluster": { + "index": "good" + } + "aws": { + "signing": true + "region": "af-south-1" + } + } + "bad" { + "type": "kinesis" + "streamName": "test-kinesis-bad-stream" + "region": "eu-west-2" + } + } + "purpose": "ENRICHED_EVENTS" +} diff --git a/core/src/test/resources/config.test4.hocon b/core/src/test/resources/config.test4.hocon new file mode 100644 index 0000000..0e0e471 --- /dev/null +++ b/core/src/test/resources/config.test4.hocon @@ -0,0 +1,28 @@ +{ + "input": { + "type": "kinesis" + "streamName": "test-kinesis-stream" + "region": "ca-central-1" + "initialPosition": "LATEST" + } + "output": { + "good": { + "client": { + "endpoint": "localhost" + } + "cluster": { + "index": "good" + } + "aws": { + "signing": true + "region": "unknown-region-1" + } + } + "bad" { + "type": "kinesis" + "streamName": "test-kinesis-bad-stream" + "region": "unknown-region-2" + } + } + "purpose": "ENRICHED_EVENTS" +} diff --git a/core/src/test/resources/config.test5.hocon b/core/src/test/resources/config.test5.hocon new file mode 100644 index 0000000..763589d --- /dev/null +++ b/core/src/test/resources/config.test5.hocon @@ -0,0 +1,25 @@ +{ + "input": { + "type": "kinesis" + "region": "ca-central-1" + "initialPosition": "LATEST" + } + "output": { + "good": { + "client": { + } + "cluster": { + "index": "good" + } + "aws": { + "signing": true + "region": "unknown-region-1" + } + } + "bad" { + "streamName": "test-kinesis-bad-stream" + "region": "unknown-region-2" + } + } + "purpose": "ENRICHED_EVENTS" +} diff --git a/core/src/test/scala/com.snowplowanalytics.stream.loader/ConfigSpec.scala b/core/src/test/scala/com.snowplowanalytics.stream.loader/ConfigSpec.scala index 1c2c3ff..2f9157f 100644 --- a/core/src/test/scala/com.snowplowanalytics.stream.loader/ConfigSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.stream.loader/ConfigSpec.scala @@ -24,9 +24,23 @@ import cats.syntax.option._ import org.specs2.mutable.Specification +import pureconfig._ + import com.snowplowanalytics.stream.loader.Config._ class ConfigSpec extends Specification { + private val DefaultTestRegion = Region("ap-east-1") + private val RealConfigReader = + com.snowplowanalytics.stream.loader.Config.implicits().streamLoaderConfigReader + private val TestConfigReader = com.snowplowanalytics.stream.loader.Config + .implicits( + new ConfigReader[Region] with ReadsMissingKeys { + override def from(cur: ConfigCursor) = + if (cur.isUndefined) Right(DefaultTestRegion) + else cur.asString.map(Region) + } + ) + .streamLoaderConfigReader "Config.parseConfig" should { "accept example extended kinesis config" >> { @@ -39,7 +53,7 @@ class ConfigSpec extends Specification { "AT_TIMESTAMP", "2020-07-17T10:00:00Z".some, 9999, - "eu-central-1".some, + Region("eu-central-1"), "test-app-name", "127.0.0.1".some, "http://localhost:4569".some, @@ -58,12 +72,12 @@ class ConfigSpec extends Specification { 5, true ), - Sink.GoodSink.Elasticsearch.ESAWS(true, "eu-central-1".some), + Sink.GoodSink.Elasticsearch.ESAWS(true, Region("eu-central-1")), Sink.GoodSink.Elasticsearch.ESCluster("good", "good-doc".some), Sink.GoodSink.Elasticsearch.ESChunk(999999, 499) ), Sink.BadSink - .Kinesis("test-kinesis-bad-stream", "eu-central-1".some, "127.0.0.1:7846".some) + .Kinesis("test-kinesis-bad-stream", Region("eu-central-1"), "127.0.0.1:7846".some) ), Purpose.Enriched, Monitoring( @@ -72,7 +86,7 @@ class ConfigSpec extends Specification { ) ) - val result = Config.parseConfig(argv) + val result = testParseConfig(argv) result must beRight(expected) } @@ -86,7 +100,7 @@ class ConfigSpec extends Specification { "LATEST", None, 10000, - "eu-central-1".some, + DefaultTestRegion, "snowplow-elasticsearch-loader", None, None, @@ -105,20 +119,20 @@ class ConfigSpec extends Specification { 6, false ), - Sink.GoodSink.Elasticsearch.ESAWS(false, None), + Sink.GoodSink.Elasticsearch.ESAWS(false, DefaultTestRegion), Sink.GoodSink.Elasticsearch.ESCluster("good", None), Sink.GoodSink.Elasticsearch.ESChunk(1000000, 500) ), - Sink.BadSink.Kinesis("test-kinesis-bad-stream", "eu-central-1".some, None) + Sink.BadSink.Kinesis("test-kinesis-bad-stream", DefaultTestRegion, None) ), - Purpose.Bad, + Purpose.Enriched, Monitoring( None, Monitoring.Metrics(true) ) ) - val result = Config.parseConfig(argv) + val result = testParseConfig(argv) result must beRight(expected) } @@ -147,7 +161,7 @@ class ConfigSpec extends Specification { 5, true ), - Sink.GoodSink.Elasticsearch.ESAWS(true, "eu-central-1".some), + Sink.GoodSink.Elasticsearch.ESAWS(true, Region("eu-central-1")), Sink.GoodSink.Elasticsearch.ESCluster("good", "good-doc".some), Sink.GoodSink.Elasticsearch.ESChunk(999999, 499) ), @@ -160,7 +174,7 @@ class ConfigSpec extends Specification { ) ) - val result = Config.parseConfig(argv) + val result = testParseConfig(argv) result must beRight(expected) } @@ -189,7 +203,7 @@ class ConfigSpec extends Specification { 6, false ), - Sink.GoodSink.Elasticsearch.ESAWS(false, None), + Sink.GoodSink.Elasticsearch.ESAWS(false, DefaultTestRegion), Sink.GoodSink.Elasticsearch.ESCluster("good", None), Sink.GoodSink.Elasticsearch.ESChunk(1000000, 500) ), @@ -202,7 +216,7 @@ class ConfigSpec extends Specification { ) ) - val result = Config.parseConfig(argv) + val result = testParseConfig(argv) result must beRight(expected) } @@ -223,7 +237,7 @@ class ConfigSpec extends Specification { ) ) - val result = Config.parseConfig(argv) + val result = testParseConfig(argv) result must beRight(expected) } @@ -244,8 +258,177 @@ class ConfigSpec extends Specification { ) ) - val result = Config.parseConfig(argv) + val result = testParseConfig(argv) result must beRight(expected) } + + "override default values" >> { + val config = Paths.get(getClass.getResource("/config.test1.hocon").toURI) + val argv = Array("--config", config.toString) + + val expected = StreamLoaderConfig( + Source.Kinesis( + "test-kinesis-stream", + "LATEST", + None, + 2000, + Region("ca-central-1"), + "test-app-name", + None, + None, + Source.Kinesis.Buffer(201, 202, 203) + ), + Sink( + Sink.GoodSink.Elasticsearch( + Sink.GoodSink.Elasticsearch.ESClient( + "localhost", + 9220, + None, + None, + None, + None, + 205, + 7, + true + ), + Sink.GoodSink.Elasticsearch.ESAWS(true, Region("ca-central-1")), + Sink.GoodSink.Elasticsearch.ESCluster("testindex", None), + Sink.GoodSink.Elasticsearch.ESChunk(206, 207) + ), + Sink.BadSink.Kinesis("test-kinesis-bad-stream", Region("ca-central-1"), None) + ), + Purpose.Bad, + Monitoring( + None, + Monitoring.Metrics(false) + ) + ) + + val result = testParseConfig(argv) + result must beRight(expected) + } + + "set region correctly with test config reader" >> { + val config = Paths.get(getClass.getResource("/config.test2.hocon").toURI) + val argv = Array("--config", config.toString) + + val expected = StreamLoaderConfig( + Source.Kinesis( + "test-kinesis-stream", + "LATEST", + None, + 10000, + Region("ca-central-1"), + "snowplow-elasticsearch-loader", + None, + None, + Source.Kinesis.Buffer(1000000, 500, 500) + ), + Sink( + Sink.GoodSink.Elasticsearch( + Sink.GoodSink.Elasticsearch.ESClient( + "localhost", + 9200, + None, + None, + None, + None, + 10000, + 6, + false + ), + Sink.GoodSink.Elasticsearch.ESAWS(false, DefaultTestRegion), + Sink.GoodSink.Elasticsearch.ESCluster("good", None), + Sink.GoodSink.Elasticsearch.ESChunk(1000000, 500) + ), + Sink.BadSink.Kinesis("test-kinesis-bad-stream", DefaultTestRegion, None) + ), + Purpose.Enriched, + Monitoring( + None, + Monitoring.Metrics(true) + ) + ) + + val result = testParseConfig(argv) + result must beRight(expected) + } + + "set region correctly with real config reader" >> { + val config = Paths.get(getClass.getResource("/config.test3.hocon").toURI) + val argv = Array("--config", config.toString) + + val expected = StreamLoaderConfig( + Source.Kinesis( + "test-kinesis-stream", + "LATEST", + None, + 10000, + Region("ca-central-1"), + "snowplow-elasticsearch-loader", + None, + None, + Source.Kinesis.Buffer(1000000, 500, 500) + ), + Sink( + Sink.GoodSink.Elasticsearch( + Sink.GoodSink.Elasticsearch.ESClient( + "localhost", + 9200, + None, + None, + None, + None, + 10000, + 6, + false + ), + Sink.GoodSink.Elasticsearch.ESAWS(true, Region("af-south-1")), + Sink.GoodSink.Elasticsearch.ESCluster("good", None), + Sink.GoodSink.Elasticsearch.ESChunk(1000000, 500) + ), + Sink.BadSink.Kinesis("test-kinesis-bad-stream", Region("eu-west-2"), None) + ), + Purpose.Enriched, + Monitoring( + None, + Monitoring.Metrics(true) + ) + ) + + val result = testParseConfig(argv, RealConfigReader) + result must beRight(expected) + } + + "give error when unknown region given" >> { + val config = Paths.get(getClass.getResource("/config.test4.hocon").toURI) + val argv = Array("--config", config.toString) + + val result = testParseConfig(argv, RealConfigReader) + result.fold( + err => err.contains("unknown-region-1") && err.contains("unknown-region-1"), + _ => false + ) must beTrue + } + + "give error when required fields are missing" >> { + val config = Paths.get(getClass.getResource("/config.test5.hocon").toURI) + val argv = Array("--config", config.toString) + + val result = testParseConfig(argv) + result.fold( + err => + err.contains("streamName") + && err.contains("endpoint") + && err.contains("type"), + _ => false + ) must beTrue + } } + + private def testParseConfig( + args: Array[String], + configReader: ConfigReader[StreamLoaderConfig] = TestConfigReader + ) = + Config.parseConfig(args, configReader) } diff --git a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala index 355c96a..a385810 100644 --- a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala +++ b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala @@ -47,6 +47,7 @@ import retry.CatsEffect._ import com.snowplowanalytics.snowplow.scalatracker.Tracker +import com.snowplowanalytics.stream.loader.Config.Region import com.snowplowanalytics.stream.loader.Config.Sink.GoodSink import com.snowplowanalytics.stream.loader.Config.Sink.GoodSink.Elasticsearch.ESChunk @@ -58,7 +59,8 @@ class ElasticsearchBulkSender( endpoint: String, port: Int, ssl: Boolean, - awsSigningRegion: Option[String], + awsSigning: Boolean, + awsSigningRegion: Region, username: Option[String], password: Option[String], documentIndex: String, @@ -76,9 +78,9 @@ class ElasticsearchBulkSender( override val log = LoggerFactory.getLogger(getClass) private val client = { - val httpClientConfigCallback = awsSigningRegion - .map(new SignedHttpClientConfigCallback(_)) - .getOrElse(NoOpHttpClientConfigCallback) + val httpClientConfigCallback = + if (awsSigning) new SignedHttpClientConfigCallback(awsSigningRegion) + else NoOpHttpClientConfigCallback val formedHost = new HttpHost(endpoint, port, if (ssl) "https" else "http") val headers: Array[Header] = (username, password) match { case (Some(_), Some(_)) => @@ -241,6 +243,7 @@ object ElasticsearchBulkSender { config.client.endpoint, config.client.port, config.client.ssl, + config.aws.signing, config.aws.region, config.client.username, config.client.password, diff --git a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/SignedHttpClientConfigCallback.scala b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/SignedHttpClientConfigCallback.scala index b52c744..2e3d89b 100644 --- a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/SignedHttpClientConfigCallback.scala +++ b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/SignedHttpClientConfigCallback.scala @@ -40,18 +40,20 @@ import io.ticofab.AwsSigner // Scala import scala.util.Try +import com.snowplowanalytics.stream.loader.Config.Region + /** * Signs outgoing HTTP requests to AWS Elasticsearch service * @param credentialsProvider AWS credentials provider * @param region in which to sign the requests */ -class SignedHttpClientConfigCallback(region: String) extends HttpClientConfigCallback { +class SignedHttpClientConfigCallback(region: Region) extends HttpClientConfigCallback { private def clock(): LocalDateTime = LocalDateTime.now(ZoneId.of("UTC")) private val service = "es" private val signer = AwsSigner( new DefaultAWSCredentialsProviderChain(), - region, + region.name, service, () => SignedHttpClientConfigCallback.this.clock() ) diff --git a/elasticsearch/src/test/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSenderSpec.scala b/elasticsearch/src/test/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSenderSpec.scala index 00e95a3..2c27b8e 100644 --- a/elasticsearch/src/test/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSenderSpec.scala +++ b/elasticsearch/src/test/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSenderSpec.scala @@ -25,6 +25,7 @@ import io.circe.literal._ import com.snowplowanalytics.stream.loader.{EmitterJsonInput, JsonRecord} import com.snowplowanalytics.stream.loader.Config.Sink.GoodSink.Elasticsearch.ESChunk +import com.snowplowanalytics.stream.loader.Config.Region // specs2 import org.specs2.mutable.Specification @@ -38,7 +39,8 @@ class ElasticsearchBulkSenderSpec extends Specification { elasticHost, elasticPort, false, - Some("region"), + false, + Region("region"), None, None, index,