Skip to content

Commit 95bb6ea

Browse files
committed
Merge branch '66-connection-timeout'
2 parents fe14537 + 2affcea commit 95bb6ea

File tree

6 files changed

+79
-9
lines changed

6 files changed

+79
-9
lines changed

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionConfiguration.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import io.r2dbc.postgresql.util.Assert;
2121
import reactor.util.annotation.Nullable;
2222

23+
import java.time.Duration;
24+
2325
/**
2426
* Connection configuration information for connecting to a PostgreSQL database.
2527
*/
@@ -32,6 +34,8 @@ public final class PostgresqlConnectionConfiguration {
3234

3335
private final String applicationName;
3436

37+
private final Duration connectTimeout;
38+
3539
private final String database;
3640

3741
private final String host;
@@ -44,8 +48,11 @@ public final class PostgresqlConnectionConfiguration {
4448

4549
private final String username;
4650

47-
private PostgresqlConnectionConfiguration(String applicationName, @Nullable String database, String host, String password, int port, @Nullable String schema, String username) {
51+
private PostgresqlConnectionConfiguration(String applicationName, @Nullable Duration connectTimeout, @Nullable String database, String host, String password, int port, @Nullable String schema,
52+
String username) {
53+
4854
this.applicationName = Assert.requireNonNull(applicationName, "applicationName must not be null");
55+
this.connectTimeout = connectTimeout;
4956
this.database = database;
5057
this.host = Assert.requireNonNull(host, "host must not be null");
5158
this.password = Assert.requireNonNull(password, "password must not be null");
@@ -65,8 +72,9 @@ public static Builder builder() {
6572

6673
@Override
6774
public String toString() {
68-
return "PostgresConnectionConfiguration{" +
75+
return "PostgresqlConnectionConfiguration{" +
6976
"applicationName='" + this.applicationName + '\'' +
77+
", connectTimeout=" + this.connectTimeout +
7078
", database='" + this.database + '\'' +
7179
", host='" + this.host + '\'' +
7280
", password='" + this.password + '\'' +
@@ -80,6 +88,11 @@ String getApplicationName() {
8088
return this.applicationName;
8189
}
8290

91+
@Nullable
92+
Duration getConnectTimeout() {
93+
return this.connectTimeout;
94+
}
95+
8396
@Nullable
8497
String getDatabase() {
8598
return this.database;
@@ -115,6 +128,8 @@ public static final class Builder {
115128

116129
private String applicationName = "r2dbc-postgresql";
117130

131+
private Duration connectTimeout;
132+
118133
private String database;
119134

120135
private String host;
@@ -148,7 +163,12 @@ public Builder applicationName(String applicationName) {
148163
* @return a configured {@link PostgresqlConnectionConfiguration}
149164
*/
150165
public PostgresqlConnectionConfiguration build() {
151-
return new PostgresqlConnectionConfiguration(this.applicationName, this.database, this.host, this.password, this.port, this.schema, this.username);
166+
return new PostgresqlConnectionConfiguration(this.applicationName, this.connectTimeout, this.database, this.host, this.password, this.port, this.schema, this.username);
167+
}
168+
169+
public Builder connectTimeout(@Nullable Duration connectTimeout) {
170+
this.connectTimeout = connectTimeout;
171+
return this;
152172
}
153173

154174
/**
@@ -212,6 +232,7 @@ public Builder schema(@Nullable String schema) {
212232
public String toString() {
213233
return "Builder{" +
214234
"applicationName='" + this.applicationName + '\'' +
235+
", connectTimeout='" + this.connectTimeout + '\'' +
215236
", database='" + this.database + '\'' +
216237
", host='" + this.host + '\'' +
217238
", password='" + this.password + '\'' +

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public PostgresqlConnectionFactory(PostgresqlConnectionConfiguration configurati
4747
this(Mono.defer(() -> {
4848
Assert.requireNonNull(configuration, "configuration must not be null");
4949

50-
return ReactorNettyClient.connect(configuration.getHost(), configuration.getPort()).cast(Client.class);
50+
return ReactorNettyClient.connect(configuration.getHost(), configuration.getPort(), configuration.getConnectTimeout()).cast(Client.class);
5151
}), configuration);
5252
}
5353

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.r2dbc.spi.ConnectionFactoryProvider;
2222
import io.r2dbc.spi.Option;
2323

24+
import static io.r2dbc.spi.ConnectionFactoryOptions.CONNECT_TIMEOUT;
2425
import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE;
2526
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
2627
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
@@ -58,6 +59,7 @@ public PostgresqlConnectionFactory create(ConnectionFactoryOptions connectionFac
5859
builder.applicationName(applicationName);
5960
}
6061

62+
builder.connectTimeout(connectionFactoryOptions.getValue(CONNECT_TIMEOUT));
6163
builder.database(connectionFactoryOptions.getValue(DATABASE));
6264
builder.host(connectionFactoryOptions.getRequiredValue(HOST));
6365
builder.password(connectionFactoryOptions.getRequiredValue(PASSWORD).toString());

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.buffer.ByteBufAllocator;
2020
import io.netty.channel.ChannelDuplexHandler;
2121
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.channel.ChannelOption;
2223
import io.r2dbc.postgresql.message.backend.BackendKeyData;
2324
import io.r2dbc.postgresql.message.backend.BackendMessage;
2425
import io.r2dbc.postgresql.message.backend.BackendMessageDecoder;
@@ -41,8 +42,10 @@
4142
import reactor.netty.Connection;
4243
import reactor.netty.resources.ConnectionProvider;
4344
import reactor.netty.tcp.TcpClient;
45+
import reactor.util.annotation.Nullable;
4446
import reactor.util.concurrent.Queues;
4547

48+
import java.time.Duration;
4649
import java.util.List;
4750
import java.util.Optional;
4851
import java.util.Queue;
@@ -168,7 +171,21 @@ private ReactorNettyClient(Connection connection) {
168171
public static Mono<ReactorNettyClient> connect(String host, int port) {
169172
Assert.requireNonNull(host, "host must not be null");
170173

171-
return connect(ConnectionProvider.newConnection(), host, port);
174+
return connect(ConnectionProvider.newConnection(), host, port, null);
175+
}
176+
177+
/**
178+
* Creates a new frame processor connected to a given host.
179+
*
180+
* @param host the host to connect to
181+
* @param port the port to connect to
182+
* @param connectTimeout connect timeout
183+
* @throws IllegalArgumentException if {@code host} is {@code null}
184+
*/
185+
public static Mono<ReactorNettyClient> connect(String host, int port, @Nullable Duration connectTimeout) {
186+
Assert.requireNonNull(host, "host must not be null");
187+
188+
return connect(ConnectionProvider.newConnection(), host, port, connectTimeout);
172189
}
173190

174191
/**
@@ -177,15 +194,21 @@ public static Mono<ReactorNettyClient> connect(String host, int port) {
177194
* @param connectionProvider the connection provider resources
178195
* @param host the host to connect to
179196
* @param port the port to connect to
197+
* @param connectTimeout connect timeout
180198
* @throws IllegalArgumentException if {@code host} is {@code null}
181199
*/
182-
public static Mono<ReactorNettyClient> connect(ConnectionProvider connectionProvider, String host, int port) {
200+
public static Mono<ReactorNettyClient> connect(ConnectionProvider connectionProvider, String host, int port, @Nullable Duration connectTimeout) {
183201
Assert.requireNonNull(connectionProvider, "connectionProvider must not be null");
184202
Assert.requireNonNull(host, "host must not be null");
185203

186-
Mono<? extends Connection> connection = TcpClient.create(connectionProvider)
187-
.host(host).port(port)
188-
.connect();
204+
TcpClient tcpClient = TcpClient.create(connectionProvider)
205+
.host(host).port(port);
206+
207+
if (connectTimeout != null) {
208+
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis()));
209+
}
210+
211+
Mono<? extends Connection> connection = tcpClient.connect();
189212

190213
return connection.map(ReactorNettyClient::new);
191214
}

src/test/java/io/r2dbc/postgresql/PostgresqlConnectionConfigurationTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import org.junit.jupiter.api.Test;
2020

21+
import java.time.Duration;
22+
2123
import static org.assertj.core.api.Assertions.assertThat;
2224
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
2325

@@ -51,6 +53,7 @@ void builderNoUsername() {
5153
void configuration() {
5254
PostgresqlConnectionConfiguration configuration = PostgresqlConnectionConfiguration.builder()
5355
.applicationName("test-application-name")
56+
.connectTimeout(Duration.ofMillis(1000))
5457
.database("test-database")
5558
.host("test-host")
5659
.password("test-password")
@@ -61,6 +64,7 @@ void configuration() {
6164

6265
assertThat(configuration)
6366
.hasFieldOrPropertyWithValue("applicationName", "test-application-name")
67+
.hasFieldOrPropertyWithValue("connectTimeout", Duration.ofMillis(1000))
6468
.hasFieldOrPropertyWithValue("database", "test-database")
6569
.hasFieldOrPropertyWithValue("host", "test-host")
6670
.hasFieldOrPropertyWithValue("password", "test-password")

src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.r2dbc.postgresql.client;
1818

19+
import io.netty.channel.ConnectTimeoutException;
1920
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
2021
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
2122
import io.r2dbc.postgresql.PostgresqlServerErrorException;
@@ -35,6 +36,7 @@
3536
import reactor.core.publisher.Mono;
3637
import reactor.test.StepVerifier;
3738

39+
import java.time.Duration;
3840
import java.util.Arrays;
3941
import java.util.stream.IntStream;
4042

@@ -148,6 +150,24 @@ void parallelExchange() {
148150
.verifyComplete();
149151
}
150152

153+
@Test
154+
void timeoutTest() {
155+
PostgresqlConnectionFactory postgresqlConnectionFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
156+
.host("example.com")
157+
.port(81)
158+
.username("test")
159+
.password("test")
160+
.database(SERVER.getDatabase())
161+
.applicationName(ReactorNettyClientTest.class.getName())
162+
.connectTimeout(Duration.ofMillis(200))
163+
.build());
164+
165+
postgresqlConnectionFactory.create()
166+
.as(StepVerifier::create)
167+
.expectError(ConnectTimeoutException.class)
168+
.verify(Duration.ofMillis(500));
169+
}
170+
151171
@Nested
152172
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
153173
final class ScramTest {

0 commit comments

Comments
 (0)