Skip to content

Commit 186314e

Browse files
GH-249 Code Review comments addressed. Added Test cases with RetryAdvice.
1 parent 4d4fcf8 commit 186314e

File tree

3 files changed

+77
-48
lines changed

3 files changed

+77
-48
lines changed

src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import org.springframework.expression.Expression;
5050
import org.springframework.expression.common.LiteralExpression;
5151
import org.springframework.integration.aws.support.AwsHeaders;
52-
import org.springframework.integration.aws.support.KPLBackpressureException;
52+
import org.springframework.integration.aws.support.KplBackpressureException;
5353
import org.springframework.integration.aws.support.UserRecordResponse;
5454
import org.springframework.integration.expression.ValueExpression;
5555
import org.springframework.integration.handler.AbstractMessageHandler;
@@ -67,6 +67,8 @@
6767
* The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer
6868
* Library {@code putRecord(s)}.
6969
*
70+
* @exception KplBackpressureException When backpressure handling is enabled and buffer is at max capacity.
71+
*
7072
* @author Arnaud Lecollaire
7173
* @author Artem Bilan
7274
* @author Siddharth Jain
@@ -121,13 +123,16 @@ public void setConverter(Converter<Object, byte[]> converter) {
121123

122124
/**
123125
* Configure maximum records in flight for handling backpressure. By Default, backpressure handling is not enabled.
124-
* On number of records in flight exceeding the threshold, {@link KPLBackpressureException} would be thrown.
125-
* If Backpressure handling is enabled, {@link KPLBackpressureException} must be handled.
126+
* On number of records in flight exceeding the threshold, {@link KplBackpressureException} would be thrown.
127+
* If Backpressure handling is enabled, {@link KplBackpressureException} must be handled.
128+
*
126129
* @param backPressureThreshold Defaulted to 0. Set a value greater than 0 to enable backpressure handling.
130+
*
127131
* @since 3.0.9
128132
*/
129133
public void setBackPressureThreshold(long backPressureThreshold) {
130-
Assert.isTrue(backPressureThreshold > 0, "'backPressureThreshold must be greater than 0.");
134+
Assert.isTrue(backPressureThreshold >= 0,
135+
"'backPressureThreshold must be greater than equal to 0.");
131136
this.backPressureThreshold = backPressureThreshold;
132137
}
133138

@@ -383,16 +388,12 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message<?>
383388
}
384389
}
385390

386-
private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord)
387-
throws KPLBackpressureException {
388-
391+
private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord) {
389392
if (this.backPressureThreshold > 0) {
390393
var numberOfRecordsInFlight = this.kinesisProducer.getOutstandingRecordsCount();
391394
if (numberOfRecordsInFlight > this.backPressureThreshold) {
392-
logger.error(String.format("Backpressure handling is enabled, Number of records in flight: %s is " +
393-
"greater than backpressure threshold: %s" +
394-
".", numberOfRecordsInFlight, this.backPressureThreshold));
395-
throw new KPLBackpressureException("Buffer already at max capacity.");
395+
throw new KplBackpressureException("Cannot send record to kinesis since buffer is at max capacity.",
396+
userRecord);
396397
}
397398
}
398399

src/main/java/org/springframework/integration/aws/support/KPLBackpressureException.java renamed to src/main/java/org/springframework/integration/aws/support/KplBackpressureException.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,32 @@
1616

1717
package org.springframework.integration.aws.support;
1818

19-
import org.springframework.messaging.MessagingException;
19+
import com.amazonaws.services.kinesis.producer.UserRecord;
2020

2121
/**
2222
* An exception triggered from {@link org.springframework.integration.aws.outbound.KplMessageHandler} while sending
2323
* records to kinesis when maximum number of records in flight exceeds the backpressure threshold.
24+
*
2425
* @author Siddharth Jain
26+
*
2527
* @since 3.0.9
2628
*/
27-
public class KPLBackpressureException extends MessagingException {
29+
public class KplBackpressureException extends RuntimeException {
30+
2831
private static final long serialVersionUID = 1L;
2932

30-
public KPLBackpressureException(String message) {
33+
private final UserRecord userRecord;
34+
35+
public KplBackpressureException(String message, UserRecord userRecord) {
3136
super(message);
37+
this.userRecord = userRecord;
38+
}
39+
40+
/**
41+
* Get the {@link UserRecord} related.
42+
* @return {@link UserRecord} linked while sending the record to kinesis.
43+
*/
44+
public UserRecord getUserRecord() {
45+
return this.userRecord;
3246
}
3347
}

src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,30 @@
2828
import org.springframework.context.annotation.Configuration;
2929
import org.springframework.integration.annotation.ServiceActivator;
3030
import org.springframework.integration.aws.support.AwsHeaders;
31-
import org.springframework.integration.aws.support.KPLBackpressureException;
31+
import org.springframework.integration.aws.support.KplBackpressureException;
3232
import org.springframework.integration.config.EnableIntegration;
33+
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
3334
import org.springframework.messaging.Message;
3435
import org.springframework.messaging.MessageChannel;
3536
import org.springframework.messaging.MessageHandler;
37+
import org.springframework.messaging.MessageHandlingException;
3638
import org.springframework.messaging.support.MessageBuilder;
39+
import org.springframework.retry.support.RetryTemplate;
3740
import org.springframework.test.annotation.DirtiesContext;
3841
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3942

4043
import static org.assertj.core.api.Assertions.assertThat;
44+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
4145
import static org.mockito.ArgumentMatchers.any;
4246
import static org.mockito.BDDMockito.given;
4347
import static org.mockito.Mockito.clearInvocations;
4448
import static org.mockito.Mockito.mock;
4549
import static org.mockito.Mockito.verify;
4650

47-
/**
51+
/** The class contains test cases for KplMessageHandler.
52+
*
4853
* @author Siddharth Jain
54+
*
4955
* @since 3.0.9
5056
*/
5157
@SpringJUnitConfig
@@ -63,42 +69,42 @@ public class KplMessageHandlerTests {
6369

6470
@Test
6571
@SuppressWarnings("unchecked")
66-
void testKPLMessageHandler_raw_payload_success() {
72+
void kplMessageHandlerWithRawPayloadBackpressureDisabledSuccess() {
6773
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
6874
.willReturn(mock());
6975
final Message<?> message = MessageBuilder
70-
.withPayload("message1")
71-
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey")
76+
.withPayload("someMessage")
77+
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
7278
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
73-
.setHeader("foo", "bar")
79+
.setHeader("someHeaderKey", "someHeaderValue")
7480
.build();
7581

7682

7783
ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor
7884
.forClass(UserRecord.class);
79-
85+
this.kplMessageHandler.setBackPressureThreshold(0);
8086
this.kinesisSendChannel.send(message);
8187
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
82-
verify(this.kinesisProducer, Mockito.times(0)).getOutstandingRecordsCount();
88+
verify(this.kinesisProducer, Mockito.never()).getOutstandingRecordsCount();
8389
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
84-
assertThat(userRecord.getStreamName()).isEqualTo("foo");
85-
assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey");
90+
assertThat(userRecord.getStreamName()).isEqualTo("someStream");
91+
assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
8692
assertThat(userRecord.getExplicitHashKey()).isNull();
8793
}
8894

8995
@Test
9096
@SuppressWarnings("unchecked")
91-
void testKPLMessageHandler_raw_payload_backpressure_capacity_available_test() {
97+
void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityAvailable() {
9298
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
9399
.willReturn(mock());
94100
this.kplMessageHandler.setBackPressureThreshold(2);
95101
given(this.kinesisProducer.getOutstandingRecordsCount())
96102
.willReturn(1);
97103
final Message<?> message = MessageBuilder
98-
.withPayload("message1")
99-
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey")
104+
.withPayload("someMessage")
105+
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
100106
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
101-
.setHeader("foo", "bar")
107+
.setHeader("someHeaderKey", "someHeaderValue")
102108
.build();
103109

104110

@@ -107,40 +113,36 @@ void testKPLMessageHandler_raw_payload_backpressure_capacity_available_test() {
107113

108114
this.kinesisSendChannel.send(message);
109115
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
110-
verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount();
116+
verify(this.kinesisProducer).getOutstandingRecordsCount();
111117
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
112-
assertThat(userRecord.getStreamName()).isEqualTo("foo");
113-
assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey");
118+
assertThat(userRecord.getStreamName()).isEqualTo("someStream");
119+
assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
114120
assertThat(userRecord.getExplicitHashKey()).isNull();
115121
}
116122

117123
@Test
118124
@SuppressWarnings("unchecked")
119-
void testKPLMessageHandler_raw_payload_backpressure_insufficient_capacity_test() {
125+
void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityInsufficient() {
120126
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
121127
.willReturn(mock());
122128
this.kplMessageHandler.setBackPressureThreshold(2);
123129
given(this.kinesisProducer.getOutstandingRecordsCount())
124130
.willReturn(5);
125131
final Message<?> message = MessageBuilder
126-
.withPayload("message1")
127-
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey")
132+
.withPayload("someMessage")
133+
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
128134
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
129-
.setHeader("foo", "bar")
135+
.setHeader("someHeaderKey", "someHeaderValue")
130136
.build();
131137

132-
try {
133-
this.kinesisSendChannel.send(message);
134-
}
135-
catch (Exception ex) {
136-
assertThat(ex).isNotNull();
137-
assertThat(ex.getCause()).isNotNull();
138-
assertThat(ex.getCause().getClass()).isEqualTo(KPLBackpressureException.class);
139-
assertThat(ex.getCause().getMessage()).isEqualTo("Buffer already at max capacity.");
140-
}
138+
assertThatExceptionOfType(RuntimeException.class)
139+
.isThrownBy(() -> this.kinesisSendChannel.send(message))
140+
.withCauseInstanceOf(MessageHandlingException.class)
141+
.withRootCauseExactlyInstanceOf(KplBackpressureException.class)
142+
.withStackTraceContaining("Cannot send record to kinesis since buffer is at max capacity.");
141143

142-
verify(this.kinesisProducer, Mockito.times(0)).addUserRecord(any(UserRecord.class));
143-
verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount();
144+
verify(this.kinesisProducer, Mockito.never()).addUserRecord(any(UserRecord.class));
145+
verify(this.kinesisProducer).getOutstandingRecordsCount();
144146
}
145147

146148
@AfterEach
@@ -158,13 +160,25 @@ public KinesisProducer kinesisProducer() {
158160
}
159161

160162
@Bean
161-
@ServiceActivator(inputChannel = "kinesisSendChannel")
163+
public RequestHandlerRetryAdvice retryAdvice() {
164+
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
165+
requestHandlerRetryAdvice.setRetryTemplate(RetryTemplate.builder()
166+
.retryOn(KplBackpressureException.class)
167+
.exponentialBackoff(100, 2.0, 1000)
168+
.maxAttempts(3)
169+
.build());
170+
return requestHandlerRetryAdvice;
171+
}
172+
173+
@Bean
174+
@ServiceActivator(inputChannel = "kinesisSendChannel", adviceChain = {"retryAdvice"})
162175
public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) {
163176
KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer);
164177
kplMessageHandler.setAsync(true);
165-
kplMessageHandler.setStream("foo");
178+
kplMessageHandler.setStream("someStream");
166179
return kplMessageHandler;
167180
}
181+
168182
}
169183

170184
}

0 commit comments

Comments
 (0)