Skip to content

Commit c4a3fcd

Browse files
committed
GH-72: Add KinesisLocalRunning and tests
Fixes #72 * Document Kinesis Channel Adapters * Fix some inconsistency in the `KinesisMessageHandler` * Add integration test against `KinesisLocalRunning` `@Rule` * Document testing against Kinesalite
1 parent 638d5e6 commit c4a3fcd

File tree

6 files changed

+361
-12
lines changed

6 files changed

+361
-12
lines changed

README.md

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,114 @@ The implementation is based on a simple table with `KEY` and `VALUE` attributes,
504504
By default the `SpringIntegrationMetadataStore` table is used and it is created during `DynamoDbMetaDataStore` initialization if that doesn't exist yet.
505505
The `DynamoDbMetaDataStore` can be used for the `KinesisMessageDrivenChannelAdapter` as a cloud-based `cehckpointStore`.
506506
507+
For testing application with the `DynamoDbMetaDataStore` you can use [Dynalite][] NPM module.
508+
What you need in your application is to configure DynamoDB client properly:
509+
510+
````java
511+
String url = "http://localhost:" + this.port;
512+
513+
this.amazonDynamoDB = AmazonDynamoDBAsyncClientBuilder.standard()
514+
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
515+
.withClientConfiguration(
516+
new ClientConfiguration()
517+
.withMaxErrorRetry(0)
518+
.withConnectionTimeout(1000))
519+
.withEndpointConfiguration(
520+
new AwsClientBuilder.EndpointConfiguration(url, Regions.DEFAULT_REGION.getName()))
521+
.build();
522+
````
523+
524+
Where you should specify the port on which you have ran the Dynalite service.
525+
Also you can use for you testing purpose a copy of `org.springframework.integration.aws.DynamoDbLocalRunning` in the `/test` directory of this project.
526+
527+
## Amazon Kinesis
528+
529+
Amazon Kinesis is a platform for streaming data on AWS, making it easy to load and analyze streaming data, and also providing the ability for you to build custom streaming data applications for specialized needs.
530+
The Spring Integration solution is fully based on the Standard `aws-java-sdk-kinesis` and doesn't use [Kinesis Client Library][] and isn't compatible with it.
531+
532+
### Inbound Channel Adapter
533+
534+
The `KinesisMessageDrivenChannelAdapter` is an extension of the `MessageProducerSupport` - event-driver channel adapter.
535+
536+
See `KinesisMessageDrivenChannelAdapter` JavaDocs and its setters for more information how to use and how to configure it in the application for Kinesis streams ingestion.
537+
538+
The Java Configuration is pretty simple:
539+
540+
````java
541+
@SpringBootApplication
542+
public static class MyConfiguration {
543+
544+
@Bean
545+
public KinesisMessageDrivenChannelAdapter kinesisInboundChannelChannel(AmazonKinesis amazonKinesis) {
546+
KinesisMessageDrivenChannelAdapter adapter =
547+
new KinesisMessageDrivenChannelAdapter(amazonKinesis, "MY_STREAM");
548+
adapter.setOutputChannel(kinesisReceiveChannel());
549+
return adapter;
550+
}
551+
}
552+
````
553+
554+
This channel adapter can be configured with the `DynamoDbMetaDataStore` mentioned above to track sequence checkpoints for shards in the cloud environment when we have several instances of our Kinesis application.
555+
By default this adapter uses `DeserializingConverter` to convert `byte[]` from the `Record` data.
556+
Can be specified as `null` with meaning no conversion and the target `Message` is sent with the `byte[]` payload.
557+
558+
The consumer group is included to the metadata store `key`.
559+
When records are consumed, they are filtered by the last stored `lastCheckpoint` under the key as `[CONSUMER_GROUP]:[STREAM]:[SHARD_ID]`.
560+
561+
### Outbound Channel Adapter
562+
563+
The `KinesisMessageHandler` is a `AbstractMessageHandler` to perform put record to the Kinesis stream.
564+
The stream, partition key (or explicit hash key) and sequence number can be determined against request message via evaluation provided expressions or can be specified statically.
565+
They also can specified as `AwsHeaders.STREAM`, `AwsHeaders.PARTITION_KEY` and `AwsHeaders.SEQUENCE_NUMBER` respectively.
566+
567+
The `payload` of request message can be:
568+
569+
- `PutRecordsRequest` to perform `AmazonKinesisAsync.putRecordsAsync`
570+
- `PutRecordRequest` to perform `AmazonKinesisAsync.putRecordAsync`
571+
- `ByteBuffer` to represent a data of the `PutRecordRequest`
572+
- `byte[]` which is wrapped to the `ByteBuffer`
573+
- any other type which is converted to the `byte[]` by the provided `Converter`; the `SerializingConverter` is used by default.
574+
575+
The Java Configuration for the message handler:
576+
577+
````java
578+
@SpringBootApplication
579+
public static class MyConfiguration {
580+
581+
@Bean
582+
@ServiceActivator(inputChannel = "kinesisSendChannel")
583+
public MessageHandler kinesisMessageHandler(AmazonKinesis amazonKinesis) {
584+
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
585+
kinesisMessageHandler.setPartitionKey("1");
586+
return kinesisMessageHandler;
587+
}
588+
589+
}
590+
````
591+
592+
For testing application with the Kinesis Channel Adapters you can use [Kinesalite][] NPM module.
593+
What you need in your application is to configure Kinesis client properly:
594+
595+
````java
596+
String url = "http://localhost:" + this.port;
597+
598+
// See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
599+
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
600+
601+
this.amazonKinesis = AmazonKinesisAsyncClientBuilder.standard()
602+
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
603+
.withClientConfiguration(
604+
new ClientConfiguration()
605+
.withMaxErrorRetry(0)
606+
.withConnectionTimeout(1000))
607+
.withEndpointConfiguration(
608+
new AwsClientBuilder.EndpointConfiguration(url, Regions.DEFAULT_REGION.getName()))
609+
.build();
610+
````
611+
612+
Where you should specify the port on which you have ran the Kinesalite service.
613+
Also you can use for you testing purpose a copy of `org.springframework.integration.aws.KinesisLocalRunning` in the `/test` directory of this project.
614+
507615
[Spring Cloud AWS]: https://github.com/spring-cloud/spring-cloud-aws
508616
[AWS SDK for Java]: http://aws.amazon.com/sdkforjava/
509617
[Amazon Web Services]: http://aws.amazon.com/
@@ -513,3 +621,6 @@ The `DynamoDbMetaDataStore` can be used for the `KinesisMessageDrivenChannelAdap
513621
[Reference Manual]: http://docs.spring.io/spring-integration/reference/html/ftp.html
514622
[Pull requests]: http://help.github.com/send-pull-requests
515623
[contributor guidelines]: https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.adoc
624+
[Dynalite]: [https://github.com/mhart/dynalite]
625+
[Kinesis Client Library]: [https://github.com/awslabs/amazon-kinesis-client]
626+
[Kinesalite]: [https://github.com/mhart/kinesalite]

src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 the original author or authors.
2+
* Copyright 2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -131,7 +131,7 @@ public void setExplicitHashKeyExpression(Expression explicitHashKeyExpression) {
131131
this.explicitHashKeyExpression = explicitHashKeyExpression;
132132
}
133133

134-
public void setSequenceNumberString(String sequenceNumberExpression) {
134+
public void setSequenceNumberExpressionString(String sequenceNumberExpression) {
135135
setSequenceNumberExpression(EXPRESSION_PARSER.parseExpression(sequenceNumberExpression));
136136
}
137137

@@ -143,6 +143,14 @@ public void setSync(boolean sync) {
143143
this.sync = sync;
144144
}
145145

146+
public void setSendTimeout(long sendTimeout) {
147+
setSendTimeoutExpression(new ValueExpression<>(sendTimeout));
148+
}
149+
150+
public void setSendTimeoutExpressionString(String sendTimeoutExpression) {
151+
setSendTimeoutExpression(EXPRESSION_PARSER.parseExpression(sendTimeoutExpression));
152+
}
153+
146154
public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
147155
this.sendTimeoutExpression = sendTimeoutExpression;
148156
}
@@ -162,7 +170,6 @@ protected void handleMessageInternal(Message<?> message) throws Exception {
162170
(AsyncHandler<PutRecordsRequest, PutRecordsResult>) this.asyncHandler);
163171
}
164172
else {
165-
166173
PutRecordRequest putRecordRequest = (message.getPayload() instanceof PutRecordRequest)
167174
? (PutRecordRequest) message.getPayload()
168175
: buildPutRecordRequest(message);
@@ -227,8 +234,7 @@ private PutRecordRequest buildPutRecordRequest(Message<?> message) {
227234
? (byte[]) payload
228235
: this.converter.convert(payload);
229236

230-
data = ByteBuffer.wrap(
231-
bytes);
237+
data = ByteBuffer.wrap(bytes);
232238
}
233239

234240
return new PutRecordRequest()

src/test/java/org/springframework/integration/aws/DynamoDbRunning.java renamed to src/test/java/org/springframework/integration/aws/DynamoDbLocalRunning.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,16 @@
3838
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
3939

4040
/**
41+
* The {@link TestWatcher} implementation for local Amazon DynamoDB service.
42+
* See https://github.com/mhart/dynalite.
43+
*
4144
* @author Artem Bilan
4245
*
4346
* @since 1.1
4447
*/
45-
public final class DynamoDbRunning extends TestWatcher {
48+
public final class DynamoDbLocalRunning extends TestWatcher {
4649

47-
private static Log logger = LogFactory.getLog(DynamoDbRunning.class);
50+
private static Log logger = LogFactory.getLog(DynamoDbLocalRunning.class);
4851

4952
// Static so that we only test once on failure: speeds up test suite
5053
private static Map<Integer, Boolean> dynamoDbOnline = new HashMap<>();
@@ -53,7 +56,7 @@ public final class DynamoDbRunning extends TestWatcher {
5356

5457
private AmazonDynamoDBAsync amazonDynamoDB;
5558

56-
private DynamoDbRunning(int port) {
59+
private DynamoDbLocalRunning(int port) {
5760
this.port = port;
5861
dynamoDbOnline.put(port, true);
5962
}
@@ -89,8 +92,8 @@ public Statement apply(Statement base, Description description) {
8992
}
9093

9194

92-
public static DynamoDbRunning isRunning(int port) {
93-
return new DynamoDbRunning(port);
95+
public static DynamoDbLocalRunning isRunning(int port) {
96+
return new DynamoDbLocalRunning(port);
9497
}
9598

9699
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aws;
18+
19+
import static org.junit.Assume.assumeNoException;
20+
import static org.junit.Assume.assumeTrue;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import org.apache.commons.logging.Log;
26+
import org.apache.commons.logging.LogFactory;
27+
import org.junit.rules.TestWatcher;
28+
import org.junit.runner.Description;
29+
import org.junit.runners.model.Statement;
30+
31+
import com.amazonaws.ClientConfiguration;
32+
import com.amazonaws.SDKGlobalConfiguration;
33+
import com.amazonaws.SdkClientException;
34+
import com.amazonaws.auth.AWSStaticCredentialsProvider;
35+
import com.amazonaws.auth.BasicAWSCredentials;
36+
import com.amazonaws.client.builder.AwsClientBuilder;
37+
import com.amazonaws.regions.Regions;
38+
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
39+
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;
40+
41+
/**
42+
* The {@link TestWatcher} implementation for local Amazon Kinesis service.
43+
* See https://github.com/mhart/kinesalite.
44+
*
45+
* @author Artem Bilan
46+
*
47+
* @since 1.1
48+
*/
49+
public final class KinesisLocalRunning extends TestWatcher {
50+
51+
private static Log logger = LogFactory.getLog(KinesisLocalRunning.class);
52+
53+
// Static so that we only test once on failure: speeds up test suite
54+
private static Map<Integer, Boolean> kinesisOnline = new HashMap<>();
55+
56+
private final int port;
57+
58+
private AmazonKinesisAsync amazonKinesis;
59+
60+
private KinesisLocalRunning(int port) {
61+
this.port = port;
62+
kinesisOnline.put(port, true);
63+
}
64+
65+
public AmazonKinesisAsync getKinesis() {
66+
return this.amazonKinesis;
67+
}
68+
69+
@Override
70+
public Statement apply(Statement base, Description description) {
71+
assumeTrue(kinesisOnline.get(this.port));
72+
73+
String url = "http://localhost:" + this.port;
74+
75+
// See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
76+
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
77+
78+
this.amazonKinesis = AmazonKinesisAsyncClientBuilder.standard()
79+
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
80+
.withClientConfiguration(
81+
new ClientConfiguration()
82+
.withMaxErrorRetry(0)
83+
.withConnectionTimeout(1000))
84+
.withEndpointConfiguration(
85+
new AwsClientBuilder.EndpointConfiguration(url, Regions.DEFAULT_REGION.getName()))
86+
.build();
87+
88+
try {
89+
this.amazonKinesis.listStreams();
90+
}
91+
catch (SdkClientException e) {
92+
logger.warn("Tests not running because no Kinesis on " + url, e);
93+
assumeNoException(e);
94+
}
95+
96+
97+
return new Statement() {
98+
public void evaluate() throws Throwable {
99+
try {
100+
base.evaluate();
101+
} finally {
102+
System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
103+
}
104+
105+
}
106+
};
107+
}
108+
109+
public static KinesisLocalRunning isRunning(int port) {
110+
return new KinesisLocalRunning(port);
111+
}
112+
113+
}

0 commit comments

Comments
 (0)