Skip to content

Commit f64f77c

Browse files
committed
GH-244: Add UserRecordResponse adapter for UserRecordResult
Fixes: #244 The `UserRecordResult` has an `attempts()` property which might be used post-put request logic in the output channel. * Introduce a `KinesisResponse` extension to adapt a `UserRecordResult`. This way the further flow logic may consult low-level result from KPL via existing `AwsHeaders.SERVICE_RESULT` header in the reply message
1 parent 6f5b58f commit f64f77c

File tree

2 files changed

+77
-9
lines changed

2 files changed

+77
-9
lines changed

Diff for: src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java

+5-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,7 +39,6 @@
3939
import software.amazon.awssdk.core.SdkBytes;
4040
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
4141
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
42-
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
4342
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
4443
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
4544
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
@@ -50,6 +49,7 @@
5049
import org.springframework.expression.Expression;
5150
import org.springframework.expression.common.LiteralExpression;
5251
import org.springframework.integration.aws.support.AwsHeaders;
52+
import org.springframework.integration.aws.support.UserRecordResponse;
5353
import org.springframework.integration.expression.ValueExpression;
5454
import org.springframework.integration.handler.AbstractMessageHandler;
5555
import org.springframework.integration.mapping.HeaderMapper;
@@ -305,7 +305,7 @@ else if (message.getPayload() instanceof UserRecord userRecord) {
305305

306306
@Override
307307
protected Map<String, ?> additionalOnSuccessHeaders(AwsRequest request, AwsResponse response) {
308-
if (response instanceof PutRecordResponse putRecordResponse) {
308+
if (response instanceof UserRecordResponse putRecordResponse) {
309309
return Map.of(AwsHeaders.SHARD, putRecordResponse.shardId(),
310310
AwsHeaders.SEQUENCE_NUMBER, putRecordResponse.sequenceNumber());
311311
}
@@ -367,14 +367,10 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message<?>
367367
}
368368
}
369369

370-
private CompletableFuture<PutRecordResponse> handleUserRecord(UserRecord userRecord) {
370+
private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord) {
371371
ListenableFuture<UserRecordResult> recordResult = this.kinesisProducer.addUserRecord(userRecord);
372372
return listenableFutureToCompletableFuture(recordResult)
373-
.thenApply(result ->
374-
PutRecordResponse.builder()
375-
.shardId(result.getShardId())
376-
.sequenceNumber(result.getSequenceNumber())
377-
.build());
373+
.thenApply(UserRecordResponse::new);
378374
}
379375

380376
private PutRecordRequest buildPutRecordRequest(Message<?> message) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aws.support;
18+
19+
import java.util.List;
20+
21+
import com.amazonaws.services.kinesis.producer.Attempt;
22+
import com.amazonaws.services.kinesis.producer.UserRecordResult;
23+
import software.amazon.awssdk.awscore.AwsResponse;
24+
import software.amazon.awssdk.core.SdkField;
25+
import software.amazon.awssdk.services.kinesis.model.KinesisResponse;
26+
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
27+
28+
/**
29+
* The {@link KinesisResponse} adapter for the KPL {@link UserRecordResult} response.
30+
*
31+
* @author Artem Bilan
32+
*
33+
* @since 3.0.8
34+
*/
35+
public class UserRecordResponse extends KinesisResponse {
36+
37+
private final String shardId;
38+
39+
private final String sequenceNumber;
40+
41+
private final List<Attempt> attempts;
42+
43+
public UserRecordResponse(UserRecordResult userRecordResult) {
44+
super(PutRecordResponse.builder());
45+
this.shardId = userRecordResult.getShardId();
46+
this.sequenceNumber = userRecordResult.getSequenceNumber();
47+
this.attempts = userRecordResult.getAttempts();
48+
}
49+
50+
public String shardId() {
51+
return this.shardId;
52+
}
53+
54+
public String sequenceNumber() {
55+
return this.sequenceNumber;
56+
}
57+
58+
public List<Attempt> attempts() {
59+
return this.attempts;
60+
}
61+
62+
@Override
63+
public AwsResponse.Builder toBuilder() {
64+
throw new UnsupportedOperationException();
65+
}
66+
67+
@Override
68+
public List<SdkField<?>> sdkFields() {
69+
throw new UnsupportedOperationException();
70+
}
71+
72+
}

0 commit comments

Comments
 (0)