Skip to content

Commit 297a1c4

Browse files
committed
GH-246: Add gracefulShutdownTimeout for KCL
Fixes: #246 Issue link: #246
1 parent ab2f9b8 commit 297a1c4

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import java.util.Arrays;
2222
import java.util.List;
2323
import java.util.UUID;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.TimeoutException;
2427
import java.util.function.Consumer;
2528

2629
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
@@ -174,6 +177,8 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport
174177

175178
private long pollingIdleTime = 1500L;
176179

180+
private long gracefulShutdownTimeout;
181+
177182
public KclMessageDrivenChannelAdapter(String... streams) {
178183
this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams);
179184
}
@@ -393,6 +398,17 @@ public void setPollingIdleTime(long pollingIdleTime) {
393398
this.pollingIdleTime = pollingIdleTime;
394399
}
395400

401+
/**
402+
* The timeout for {@link Scheduler#startGracefulShutdown()}.
403+
* Defaults to {@code 0} with the meaning to call {@link Scheduler#shutdown()}.
404+
* @param gracefulShutdownTimeout the timeout for {@link Scheduler#startGracefulShutdown()}.
405+
* @since 3.0.8
406+
* @see Scheduler#startGracefulShutdown()
407+
*/
408+
public void setGracefulShutdownTimeout(long gracefulShutdownTimeout) {
409+
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
410+
}
411+
396412
@Override
397413
protected void onInit() {
398414
super.onInit();
@@ -494,14 +510,25 @@ protected void doStart() {
494510
@Override
495511
protected void doStop() {
496512
super.doStop();
497-
this.scheduler.shutdown();
513+
if (this.gracefulShutdownTimeout == 0) {
514+
this.scheduler.shutdown();
515+
}
516+
else {
517+
try {
518+
logger.info("Start graceful shutdown for KCL...");
519+
this.scheduler.startGracefulShutdown().get(this.gracefulShutdownTimeout, TimeUnit.MILLISECONDS);
520+
}
521+
catch (InterruptedException | ExecutionException | TimeoutException ex) {
522+
throw new RuntimeException("Graceful shutdown for KCL has failed.", ex);
523+
}
524+
}
498525
}
499526

500527
@Override
501528
public void destroy() {
502529
super.destroy();
503530
if (isRunning()) {
504-
this.scheduler.shutdown();
531+
stop();
505532
}
506533
}
507534

src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
194194
adapter.setBindSourceRecord(true);
195195
adapter.setEmptyRecordList(true);
196196
adapter.setPollingMaxRecords(99);
197+
adapter.setGracefulShutdownTimeout(100);
197198
return adapter;
198199
}
199200

0 commit comments

Comments
 (0)