From 67c474f34d688b3854912275ba85886aecbb5536 Mon Sep 17 00:00:00 2001 From: spenes Date: Wed, 29 Sep 2021 15:51:51 +0300 Subject: [PATCH] Small changes --- config/config.kinesis.reference.hocon | 6 ++++++ .../com.snowplowanalytics.stream/loader/Config.scala | 8 ++++++++ .../loader/executors/KinesisSourceExecutor.scala | 5 ++--- .../loader/executors/NsqSourceExecutor.scala | 2 -- .../loader/sinks/KinesisSink.scala | 8 +++++++- .../stream/loader/ElasticsearchLoader.scala | 2 -- .../stream/loader/clients/ElasticsearchBulkSender.scala | 2 +- .../loader/clients/SignedHttpClientConfigCallback.scala | 1 - 8 files changed, 24 insertions(+), 10 deletions(-) diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index ceb397a..a1dc0c1 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -29,6 +29,9 @@ "maxRecords": 9999 # Region where the Kinesis stream is located + # This field is optional if it can be resolved with AWS region provider chain. + # It checks places like env variables, system properties, AWS profile file. + # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html "region": "eu-central-1" # Optional endpoint url configuration to override aws kinesis endpoints, @@ -128,6 +131,9 @@ "streamName": "test-kinesis-bad-stream" # Region where the Kinesis stream is located + # This field is optional if it can be resolved with AWS region provider chain. + # It checks places like env variables, system properties, AWS profile file. + # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html "region": "eu-central-1" # Optional endpoint url configuration to override aws kinesis endpoints, diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala index e50e379..0913575 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/Config.scala @@ -187,6 +187,14 @@ object Config { final case class RawConfigReaderFailure(description: String, origin: Option[ConfigOrigin] = None) extends ConfigReaderFailure + /** + * All config implicits are put into case class because we want to make region config reader + * changeable to write unit tests for config parsing. + * Region config reader is special config reader because it allows Region types to be missing. + * If they are missing, it tries to retrieve region with "DefaultAwsRegionProviderChain". + * If it is also unsuccessful, it throws error. In the tests, it is changed with dummy config + * reader in order to not use "DefaultAwsRegionProviderChain" during tests. + */ case class implicits( regionConfigReader: ConfigReader[Region] with ReadsMissingKeys = new ConfigReader[Region] with ReadsMissingKeys { diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KinesisSourceExecutor.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KinesisSourceExecutor.scala index 876c2d5..2ed36c0 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KinesisSourceExecutor.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/KinesisSourceExecutor.scala @@ -45,13 +45,12 @@ import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.snowplowanalytics.stream.loader.Config._ /** - * Boilerplate class for Kinesis Conenector - * @param streamLoaderConfig streamLoaderConfig + * Boilerplate class for Kinesis Connector * @param kinesis queue settings + * @param metrics metrics settings * @param kinesisConnectorPipeline kinesisConnectorPipeline */ class KinesisSourceExecutor[A, B]( - streamLoaderConfig: StreamLoaderConfig, kinesis: Source.Kinesis, metrics: Monitoring.Metrics, kinesisConnectorPipeline: IKinesisConnectorPipeline[A, B] diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/NsqSourceExecutor.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/NsqSourceExecutor.scala index 18e2634..465d872 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/NsqSourceExecutor.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/executors/NsqSourceExecutor.scala @@ -49,14 +49,12 @@ import transformers.{BadEventTransformer, EnrichedEventJsonTransformer, JsonTran * * @param purpose kind of data stored, good, bad or plain-json * @param nsq Nsq NsqConfig - * @param config ESLoader Configuration * @param goodSink the configured GoodSink * @param badSink the configured BadSink */ class NsqSourceExecutor( purpose: Purpose, nsq: Source.Nsq, - config: StreamLoaderConfig, goodSink: Either[ISink, BulkSender[EmitterJsonInput]], badSink: ISink, shardDateField: Option[String], diff --git a/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KinesisSink.scala b/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KinesisSink.scala index 34319db..4ad71b3 100644 --- a/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KinesisSink.scala +++ b/core/src/main/scala/com.snowplowanalytics.stream/loader/sinks/KinesisSink.scala @@ -54,7 +54,8 @@ class KinesisSink(conf: KinesisSinkConfig) extends ISink { val client = AmazonKinesisClientBuilder .standard() .withCredentials(new DefaultAWSCredentialsProviderChain()) - .withRegion(conf.region.name) + // Region should be set in the EndpointConfiguration when custom endpoint is used + .condWith(conf.customEndpoint.isEmpty, _.withRegion(conf.region.name)) .optWith[String]( conf.customEndpoint, b => e => b.withEndpointConfiguration(new EndpointConfiguration(e, conf.region.name)) @@ -121,5 +122,10 @@ class KinesisSink(conf: KinesisSinkConfig) extends ISink { f: AmazonKinesisClientBuilder => A => AmazonKinesisClientBuilder ): AmazonKinesisClientBuilder = opt.map(f(builder)).getOrElse(builder) + def condWith[A]( + cond: => Boolean, + f: AmazonKinesisClientBuilder => AmazonKinesisClientBuilder + ): AmazonKinesisClientBuilder = + if (cond) f(builder) else builder } } diff --git a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/ElasticsearchLoader.scala b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/ElasticsearchLoader.scala index 2b538d9..010066f 100644 --- a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/ElasticsearchLoader.scala +++ b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/ElasticsearchLoader.scala @@ -86,7 +86,6 @@ object ElasticsearchLoader { shardDateFormat ) new KinesisSourceExecutor[ValidatedJsonRecord, EmitterJsonInput]( - config, c, config.monitoring.metrics, pipeline @@ -97,7 +96,6 @@ object ElasticsearchLoader { new NsqSourceExecutor( config.purpose, c, - config, goodSink, badSink, shardDateField, diff --git a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala index a385810..58f1d3d 100644 --- a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala +++ b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/ElasticsearchBulkSender.scala @@ -148,7 +148,7 @@ class ElasticsearchBulkSender( } } else Nil - log.info(s"Emitted ${esObjects.size - newFailures.size} records to Elasticseacrch") + log.info(s"Emitted ${esObjects.size - newFailures.size} records to Elasticsearch") if (newFailures.nonEmpty) logHealth() val allFailures = oldFailures ++ newFailures diff --git a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/SignedHttpClientConfigCallback.scala b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/SignedHttpClientConfigCallback.scala index 2e3d89b..915d31d 100644 --- a/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/SignedHttpClientConfigCallback.scala +++ b/elasticsearch/src/main/scala/com/snowplowanalytics/stream/loader/clients/SignedHttpClientConfigCallback.scala @@ -44,7 +44,6 @@ import com.snowplowanalytics.stream.loader.Config.Region /** * Signs outgoing HTTP requests to AWS Elasticsearch service - * @param credentialsProvider AWS credentials provider * @param region in which to sign the requests */ class SignedHttpClientConfigCallback(region: Region) extends HttpClientConfigCallback {