Skip to content

Commit

Permalink
Small changes
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 29, 2021
1 parent dc1a49c commit 67c474f
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 10 deletions.
6 changes: 6 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
"maxRecords": 9999

# Region where the Kinesis stream is located
# This field is optional if it can be resolved with AWS region provider chain.
# It checks places like env variables, system properties, AWS profile file.
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"

# Optional endpoint url configuration to override aws kinesis endpoints,
Expand Down Expand Up @@ -128,6 +131,9 @@
"streamName": "test-kinesis-bad-stream"

# Region where the Kinesis stream is located
# This field is optional if it can be resolved with AWS region provider chain.
# It checks places like env variables, system properties, AWS profile file.
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"

# Optional endpoint url configuration to override aws kinesis endpoints,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ object Config {
final case class RawConfigReaderFailure(description: String, origin: Option[ConfigOrigin] = None)
extends ConfigReaderFailure

/**
* All config implicits are put into case class because we want to make region config reader
* changeable to write unit tests for config parsing.
* Region config reader is special config reader because it allows Region types to be missing.
* If they are missing, it tries to retrieve region with "DefaultAwsRegionProviderChain".
* If it is also unsuccessful, it throws error. In the tests, it is changed with dummy config
* reader in order to not use "DefaultAwsRegionProviderChain" during tests.
*/
case class implicits(
regionConfigReader: ConfigReader[Region] with ReadsMissingKeys = new ConfigReader[Region]
with ReadsMissingKeys {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.snowplowanalytics.stream.loader.Config._

/**
* Boilerplate class for Kinesis Conenector
* @param streamLoaderConfig streamLoaderConfig
* Boilerplate class for Kinesis Connector
* @param kinesis queue settings
* @param metrics metrics settings
* @param kinesisConnectorPipeline kinesisConnectorPipeline
*/
class KinesisSourceExecutor[A, B](
streamLoaderConfig: StreamLoaderConfig,
kinesis: Source.Kinesis,
metrics: Monitoring.Metrics,
kinesisConnectorPipeline: IKinesisConnectorPipeline[A, B]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ import transformers.{BadEventTransformer, EnrichedEventJsonTransformer, JsonTran
*
* @param purpose kind of data stored, good, bad or plain-json
* @param nsq Nsq NsqConfig
* @param config ESLoader Configuration
* @param goodSink the configured GoodSink
* @param badSink the configured BadSink
*/
class NsqSourceExecutor(
purpose: Purpose,
nsq: Source.Nsq,
config: StreamLoaderConfig,
goodSink: Either[ISink, BulkSender[EmitterJsonInput]],
badSink: ISink,
shardDateField: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class KinesisSink(conf: KinesisSinkConfig) extends ISink {
val client = AmazonKinesisClientBuilder
.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain())
.withRegion(conf.region.name)
// Region should be set in the EndpointConfiguration when custom endpoint is used
.condWith(conf.customEndpoint.isEmpty, _.withRegion(conf.region.name))
.optWith[String](
conf.customEndpoint,
b => e => b.withEndpointConfiguration(new EndpointConfiguration(e, conf.region.name))
Expand Down Expand Up @@ -121,5 +122,10 @@ class KinesisSink(conf: KinesisSinkConfig) extends ISink {
f: AmazonKinesisClientBuilder => A => AmazonKinesisClientBuilder
): AmazonKinesisClientBuilder =
opt.map(f(builder)).getOrElse(builder)
def condWith[A](
cond: => Boolean,
f: AmazonKinesisClientBuilder => AmazonKinesisClientBuilder
): AmazonKinesisClientBuilder =
if (cond) f(builder) else builder
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ object ElasticsearchLoader {
shardDateFormat
)
new KinesisSourceExecutor[ValidatedJsonRecord, EmitterJsonInput](
config,
c,
config.monitoring.metrics,
pipeline
Expand All @@ -97,7 +96,6 @@ object ElasticsearchLoader {
new NsqSourceExecutor(
config.purpose,
c,
config,
goodSink,
badSink,
shardDateField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class ElasticsearchBulkSender(
}
} else Nil

log.info(s"Emitted ${esObjects.size - newFailures.size} records to Elasticseacrch")
log.info(s"Emitted ${esObjects.size - newFailures.size} records to Elasticsearch")
if (newFailures.nonEmpty) logHealth()

val allFailures = oldFailures ++ newFailures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import com.snowplowanalytics.stream.loader.Config.Region

/**
* Signs outgoing HTTP requests to AWS Elasticsearch service
* @param credentialsProvider AWS credentials provider
* @param region in which to sign the requests
*/
class SignedHttpClientConfigCallback(region: Region) extends HttpClientConfigCallback {
Expand Down

0 comments on commit 67c474f

Please sign in to comment.