Skip to content

Commit ace6cfe

Browse files
joshistespring-builds
authored andcommitted
PostgresChannelMessageTableSubscriber: Renew connection only if invalid
Fixes: #9111 An evolution of the #9061: renew the connection only when we need to. (cherry picked from commit da29e2d)
1 parent 5a5f2de commit ace6cfe

File tree

2 files changed

+20
-5
lines changed

2 files changed

+20
-5
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,10 @@ private void doStart(CountDownLatch startingLatch) {
236236
if (!isActive()) {
237237
return;
238238
}
239-
if (notifications == null || notifications.length == 0) {
239+
if ((notifications == null || notifications.length == 0) && !conn.isValid(1)) {
240240
//We did not receive any notifications within the timeout period.
241-
//We will close the connection and re-establish it.
241+
//If the connection is still valid, we will continue polling
242+
//Otherwise, we will close the connection and re-establish it.
242243
break;
243244
}
244245
for (PGNotification notification : notifications) {

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.List;
2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.concurrent.atomic.AtomicInteger;
2728
import java.util.concurrent.atomic.AtomicReference;
29+
import java.util.function.Consumer;
2830

2931
import javax.sql.DataSource;
3032

@@ -270,7 +272,18 @@ public void testRenewConnection() throws Exception {
270272
CountDownLatch latch = new CountDownLatch(2);
271273
List<Object> payloads = new ArrayList<>();
272274
CountDownLatch connectionLatch = new CountDownLatch(2);
273-
connectionSupplier.onGetConnection = connectionLatch::countDown;
275+
AtomicBoolean connectionCloseState = new AtomicBoolean();
276+
connectionSupplier.onGetConnection = conn -> {
277+
connectionLatch.countDown();
278+
if (connectionCloseState.compareAndSet(false, true)) {
279+
try {
280+
conn.close();
281+
}
282+
catch (Exception e) {
283+
//nop
284+
}
285+
}
286+
};
274287
postgresChannelMessageTableSubscriber.start();
275288
postgresSubscribableChannel.subscribe(message -> {
276289
payloads.add(message.getPayload());
@@ -326,7 +339,7 @@ public JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) {
326339

327340
private static class ConnectionSupplier implements PgConnectionSupplier {
328341

329-
Runnable onGetConnection;
342+
Consumer<PgConnection> onGetConnection;
330343

331344
@Override
332345
public PgConnection get() throws SQLException {
@@ -335,10 +348,11 @@ public PgConnection get() throws SQLException {
335348
POSTGRES_CONTAINER.getPassword())
336349
.unwrap(PgConnection.class);
337350
if (this.onGetConnection != null) {
338-
this.onGetConnection.run();
351+
this.onGetConnection.accept(conn);
339352
}
340353
return conn;
341354
}
342355

343356
}
357+
344358
}

0 commit comments

Comments
 (0)