Skip to content

Commit 41b389e

Browse files
committed
Move Kinesis batch checkpoint before sending
Since the listener may take some time for records processing, there is a possibility that checkpoint will be stored too late after the process and thus we are able to get the same records in different channel adapter for the same shard, even if they are in the same consumer group and use shared `MetadataStore` This solution is some compromise for the current state of things and has to be reconsidered in the future in favor of proper rebalance and shard leader election solution As a workaround for the duplicate records an additional `@IdempotentReceiver` approach can be used * Upgrade to Gradle 4.2.1, Checkstyle 8.3, AssertJ 3.8.0 * Fix race condition in the `KinesisMessageDrivenChannelAdapterTests`
1 parent 43754e1 commit 41b389e

File tree

6 files changed

+15
-9
lines changed

6 files changed

+15
-9
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ target
1010
/*.ipr
1111
/*.iws
1212
bin/
13+
out

build.gradle

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ buildscript {
33
maven { url 'http://repo.spring.io/plugins-release' }
44
}
55
dependencies {
6-
classpath 'io.spring.gradle:dependency-management-plugin:1.0.2.RELEASE'
6+
classpath 'io.spring.gradle:dependency-management-plugin:1.0.3.RELEASE'
77
classpath 'io.spring.gradle:spring-io-plugin:0.0.8.RELEASE'
88
}
99
}
@@ -31,11 +31,11 @@ repositories {
3131
}
3232

3333
ext {
34-
assertjVersion = '3.6.2'
34+
assertjVersion = '3.8.0'
3535
servletApiVersion = '3.1.0'
3636
slf4jVersion = '1.7.25'
3737
springCloudAwsVersion = '1.2.1.RELEASE'
38-
springIntegrationVersion = '4.3.11.RELEASE'
38+
springIntegrationVersion = '4.3.12.RELEASE'
3939

4040
idPrefix = 'aws'
4141

@@ -96,7 +96,7 @@ jacoco {
9696

9797
checkstyle {
9898
configFile = file("${rootDir}/src/checkstyle/checkstyle.xml")
99-
toolVersion = "8.1"
99+
toolVersion = "8.3"
100100
}
101101

102102
dependencies {

gradle/wrapper/gradle-wrapper.jar

4 Bytes
Binary file not shown.

gradle/wrapper/gradle-wrapper.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
33
zipStoreBase=GRADLE_USER_HOME
44
zipStorePath=wrapper/dists
5-
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip
5+
distributionUrl=https\://services.gradle.org/distributions/gradle-4.2.1-bin.zip

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,12 @@ private void processRecords(List<Record> records) {
885885
if (logger.isTraceEnabled()) {
886886
logger.trace("Processing records: " + records + " for [" + ShardConsumer.this + "]");
887887
}
888+
889+
// TODO Reconsider this logic after rebalance and shard leader election implementation
890+
if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
891+
this.checkpointer.checkpoint();
892+
}
893+
888894
switch (KinesisMessageDrivenChannelAdapter.this.listenerMode) {
889895
case record:
890896
for (Record record : records) {
@@ -909,6 +915,7 @@ private void processRecords(List<Record> records) {
909915
this.checkpointer.checkpoint(record.getSequenceNumber());
910916
}
911917
}
918+
912919
break;
913920

914921
case batch:
@@ -925,10 +932,6 @@ private void processRecords(List<Record> records) {
925932
break;
926933

927934
}
928-
929-
if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
930-
this.checkpointer.checkpoint();
931-
}
932935
}
933936
}
934937

src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ public void testReshadring() throws InterruptedException {
205205

206206
// When resharding happens the describeStream() is performed again
207207
verify(this.amazonKinesisForResharding, atLeast(2)).describeStream(any(DescribeStreamRequest.class));
208+
209+
this.reshardingChannelAdapter.stop();
208210
}
209211

210212
@Configuration

0 commit comments

Comments
 (0)