Skip to content

Commit 259ff7b

Browse files
committedFeb 10, 2025
GH-252: Fix NPE in the KinesisMessageDrivenChannelAdapter.
Fixes: #252 * Adapt to Java `23` * Upgrade to Gradle `8.12.1` * Include `org.mockito` & `net.bytebuddy` dependencies explicitly since they are overridden by transitive dependencies to not compatible versions with Java `23` * Add `-parameters` compiler option to include method param names into bytecode for better discovery by reflection
1 parent 6cb04f3 commit 259ff7b

File tree

7 files changed

+21
-19
lines changed

7 files changed

+21
-19
lines changed
 

‎build.gradle

+6-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ repositories {
2626
}
2727

2828
ext {
29-
assertjVersion = '3.26.3'
3029
awaitilityVersion = '4.2.2'
3130
awsSdkVersion = '2.20.162'
3231
jacksonVersion = '2.15.4'
@@ -126,10 +125,11 @@ dependencies {
126125

127126
optionalApi "jakarta.servlet:jakarta.servlet-api:$servletApiVersion"
128127

129-
testImplementation('org.springframework.integration:spring-integration-test') {
130-
exclude group: 'junit'
131-
}
132-
testImplementation "org.assertj:assertj-core:$assertjVersion"
128+
testImplementation 'org.mockito:mockito-core:5.15.2'
129+
testImplementation 'net.bytebuddy:byte-buddy:1.15.11'
130+
testImplementation 'net.bytebuddy:byte-buddy-agent:1.15.11'
131+
132+
testImplementation 'org.springframework.integration:spring-integration-test'
133133
testImplementation("org.awaitility:awaitility:$awaitilityVersion") {
134134
exclude group: 'org.hamcrest'
135135
}
@@ -172,7 +172,7 @@ javadoc {
172172

173173
// enable all compiler warnings; individual projects may customize further
174174
ext.xLintArg = '-Xlint:all,-options'
175-
[compileJava, compileTestJava]*.options*.compilerArgs = [xLintArg]
175+
[compileJava, compileTestJava]*.options*.compilerArgs = [xLintArg, '-parameters']
176176

177177
test {
178178
maxHeapSize = '1024m'
@@ -186,7 +186,6 @@ check.dependsOn javadoc
186186
task updateCopyrights {
187187
onlyIf { !isCI }
188188
inputs.files(modifiedFiles)
189-
outputs.dir('build/classes')
190189

191190
doLast {
192191
def now = Calendar.instance.get(Calendar.YEAR) as String

‎gradle/wrapper/gradle-wrapper.properties

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionSha256Sum=1541fa36599e12857140465f3c91a97409b4512501c26f9631fb113e392c5bd1
4-
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.1-bin.zip
3+
distributionSha256Sum=8d97a97984f6cbd2b85fe4c60a743440a347544bf18818048e611f5288d46c94
4+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.12.1-bin.zip
55
networkTimeout=10000
66
validateDistributionUrl=true
77
zipStoreBase=GRADLE_USER_HOME

‎gradlew

+1-2
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ done
8686
# shellcheck disable=SC2034
8787
APP_BASE_NAME=${0##*/}
8888
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
89-
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s
90-
' "$PWD" ) || exit
89+
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit
9190

9291
# Use the maximum available, or set MAX_FD != -1 to use that value.
9392
MAX_FD=maximum

‎src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -53,6 +53,7 @@ public S3InboundFileSynchronizer(S3Client amazonS3) {
5353
* {@link Session} instances.
5454
* @param sessionFactory The session factory.
5555
*/
56+
@SuppressWarnings("this-escape")
5657
public S3InboundFileSynchronizer(SessionFactory<S3Object> sessionFactory) {
5758
super(sessionFactory);
5859
doSetRemoteDirectoryExpression(new LiteralExpression(null));

‎src/main/java/org/springframework/integration/aws/inbound/S3StreamingMessageSource.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -45,6 +45,7 @@ public S3StreamingMessageSource(RemoteFileTemplate<S3Object> template) {
4545
super(template, null);
4646
}
4747

48+
@SuppressWarnings("this-escape")
4849
public S3StreamingMessageSource(RemoteFileTemplate<S3Object> template, Comparator<S3Object> comparator) {
4950
super(template, comparator);
5051
doSetFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3StreamingMessageSource"));

‎src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2024 the original author or authors.
2+
* Copyright 2017-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1150,7 +1150,7 @@ private Runnable processTask() {
11501150
};
11511151
}
11521152

1153-
private void rewindIteratorOnError(Exception ex, GetRecordsResponse result) {
1153+
private void rewindIteratorOnError(Exception ex, @Nullable GetRecordsResponse result) {
11541154
String lastCheckpoint = this.checkpointer.getLastCheckpointValue();
11551155
String highestSequence = this.checkpointer.getHighestSequence();
11561156

@@ -1159,7 +1159,7 @@ private void rewindIteratorOnError(Exception ex, GetRecordsResponse result) {
11591159
logger.info(ex, "getRecords request has thrown exception. " +
11601160
"No checkpoints - re-request with the current shard iterator.");
11611161
}
1162-
else if (highestSequence.equals(lastCheckpoint)) {
1162+
else if (highestSequence.equals(lastCheckpoint) && result != null) {
11631163
logger.info(ex, "Record processor has thrown exception. " +
11641164
"Ignore since the highest sequence in batch was check-pointed.");
11651165
this.shardIterator = result.nextShardIterator();
@@ -1187,8 +1187,10 @@ else if (reRequestCurrentShardIterator(lastCheckpoint, result)) {
11871187
}
11881188
}
11891189

1190-
private boolean reRequestCurrentShardIterator(@Nullable String lastCheckpoint, GetRecordsResponse result) {
1191-
if (lastCheckpoint == null) {
1190+
private boolean reRequestCurrentShardIterator(@Nullable String lastCheckpoint,
1191+
@Nullable GetRecordsResponse result) {
1192+
1193+
if (lastCheckpoint == null || result == null) {
11921194
return true;
11931195
}
11941196
List<Record> records = result.records();

‎src/main/java/org/springframework/integration/aws/support/KplBackpressureException.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class KplBackpressureException extends RuntimeException {
3434
@Serial
3535
private static final long serialVersionUID = 1L;
3636

37-
private final UserRecord userRecord;
37+
private final transient UserRecord userRecord;
3838

3939
public KplBackpressureException(String message, UserRecord userRecord) {
4040
super(message);

0 commit comments

Comments
 (0)