Skip to content

Commit 347f611

Browse files
authored
Merge pull request #1312 from zhicwu/main
Enhance r2dbc driver
2 parents b3a86be + 065fd7c commit 347f611

File tree

7 files changed

+130
-23
lines changed

7 files changed

+130
-23
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.4.3
2+
### Bug Fixes
3+
* r2dbc driver does not support most client options. [#1299](https://github.com/ClickHouse/clickhouse-java/issues/1299)
4+
* incorrect content from Lz4InputStream when using text-based data format [#48446](https://github.com/ClickHouse/ClickHouse/issues/48446)
5+
16
## 0.4.2, 2023-03-21
27
### Breaking Changes
38
* ClickHouseSqlStatement and *ParserHandler in JDBC driver were refactored to support `compression` and `infile` in insert statement.

clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseStatement.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public class ClickHouseStatement implements Statement {
2424

2525
private static final Logger log = LoggerFactory.getLogger(ClickHouseStatement.class);
2626

27-
private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.TabSeparatedWithNamesAndTypes;
2827
private static final String NULL_VALUES_ARE_NOT_ALLOWED_AS_VALUE = "null values are not allowed as value.";
2928
private static final String CLASS_TYPES_ARE_NOT_ALLOWED_AS_VALUE = "class types are not allowed as value.";
3029
private static final String INVALID_PARAMETER_INDEX = "Invalid parameter index! Parameter index must be greater than 0.";
@@ -43,9 +42,7 @@ public class ClickHouseStatement implements Statement {
4342
private int fetchSize;
4443

4544
public ClickHouseStatement(String sql, ClickHouseRequest<?> request) {
46-
this.request = request
47-
.format(PREFERRED_FORMAT)
48-
.query(sql);
45+
this.request = request.query(sql);
4946
namedParameters = request.getPreparedQuery().getParameters();
5047
bindings = new ClickHouseStatementBinding(namedParameters.size());
5148
}

clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnection.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package com.clickhouse.r2dbc.connection;
22

33
import com.clickhouse.client.ClickHouseClient;
4+
import com.clickhouse.client.ClickHouseConfig;
45
import com.clickhouse.client.ClickHouseNode;
56
import com.clickhouse.client.ClickHouseNodeSelector;
6-
import com.clickhouse.client.ClickHouseNodes;
77
import com.clickhouse.client.ClickHouseProtocol;
88
import com.clickhouse.client.ClickHouseRequest;
99
import com.clickhouse.client.config.ClickHouseClientOption;
10+
import com.clickhouse.data.ClickHouseFormat;
1011
import com.clickhouse.logging.Logger;
1112
import com.clickhouse.logging.LoggerFactory;
1213
import com.clickhouse.r2dbc.ClickHouseBatch;
@@ -22,6 +23,7 @@
2223
import reactor.core.publisher.Mono;
2324

2425
import java.time.Duration;
26+
import java.util.function.Function;
2527

2628
import static reactor.core.publisher.Mono.just;
2729

@@ -31,16 +33,20 @@ public class ClickHouseConnection implements Connection {
3133

3234
private static final String PRODUCT_NAME = "ClickHouse-R2dbcDriver";
3335

34-
public static final int DEFAULT_TIMEOUT_FOR_CONNECTION_HEALTH_CHECK = (Integer) ClickHouseClientOption.CONNECTION_TIMEOUT.getDefaultValue();
36+
public static final int DEFAULT_TIMEOUT_FOR_CONNECTION_HEALTH_CHECK = (Integer) ClickHouseClientOption.CONNECTION_TIMEOUT
37+
.getDefaultValue();
3538
final ClickHouseClient client;
3639
final ClickHouseNode node;
3740
private boolean closed = false;
3841

39-
ClickHouseConnection(ClickHouseNodes nodes) {
42+
ClickHouseConnection(Function<ClickHouseNodeSelector, ClickHouseNode> nodes) {
4043
this.node = nodes.apply(ClickHouseNodeSelector.EMPTY);
41-
this.client = ClickHouseClient.newInstance(node.getProtocol());
42-
}
4344

45+
ClickHouseConfig config = this.node.getConfig();
46+
this.client = ClickHouseClient.builder()
47+
.option(ClickHouseClientOption.FORMAT, ClickHouseFormat.RowBinaryWithNamesAndTypes).config(config)
48+
.nodeSelector(ClickHouseNodeSelector.of(this.node.getProtocol())).build();
49+
}
4450

4551
/**
4652
* Transactions are not supported so this is a no-op implementation,
@@ -129,7 +135,6 @@ public boolean isAutoCommit() {
129135
return true;
130136
}
131137

132-
133138
@Override
134139
public ConnectionMetadata getMetadata() {
135140
return new ClickHouseConnectionMetadata(client, node);
@@ -196,7 +201,7 @@ public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
196201
@Override
197202
public Publisher<Boolean> validate(ValidationDepth validationDepth) {
198203
if (validationDepth == ValidationDepth.REMOTE) {
199-
return closed ? just(false) : just(client.ping(node, DEFAULT_TIMEOUT_FOR_CONNECTION_HEALTH_CHECK));
204+
return closed ? just(false) : just(client.ping(node, DEFAULT_TIMEOUT_FOR_CONNECTION_HEALTH_CHECK));
200205
} else { // validationDepth.LOCAL
201206
return just(client != null && !closed);
202207
}

clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package com.clickhouse.r2dbc.connection;
22

3+
import java.util.function.Function;
34

4-
import com.clickhouse.client.ClickHouseNodes;
5+
import com.clickhouse.client.ClickHouseNode;
6+
import com.clickhouse.client.ClickHouseNodeSelector;
57
import io.r2dbc.spi.Connection;
68
import io.r2dbc.spi.ConnectionFactory;
79
import io.r2dbc.spi.ConnectionFactoryMetadata;
810
import reactor.core.publisher.Mono;
911

1012
public class ClickHouseConnectionFactory implements ConnectionFactory {
13+
private final Function<ClickHouseNodeSelector, ClickHouseNode> nodes;
1114

12-
13-
private final ClickHouseNodes nodes;
14-
15-
ClickHouseConnectionFactory(ClickHouseNodes nodes) {
15+
ClickHouseConnectionFactory(Function<ClickHouseNodeSelector, ClickHouseNode> nodes) {
1616
this.nodes = nodes;
1717
}
1818

clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryProvider.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.clickhouse.client.ClickHouseClient;
44
import com.clickhouse.client.ClickHouseNodes;
55
import com.clickhouse.client.ClickHouseProtocol;
6+
import com.clickhouse.client.config.ClickHouseClientOption;
67
import com.clickhouse.client.config.ClickHouseDefaults;
78
import com.clickhouse.config.ClickHouseOption;
89
import com.clickhouse.data.ClickHouseUtils;
@@ -17,6 +18,7 @@
1718
import java.util.LinkedHashSet;
1819
import java.util.List;
1920
import java.util.Locale;
21+
import java.util.Objects;
2022
import java.util.ServiceLoader;
2123
import java.util.Set;
2224

@@ -29,12 +31,21 @@ public class ClickHouseConnectionFactoryProvider implements io.r2dbc.spi.Connect
2931
*/
3032
public static final String CLICKHOUSE_DRIVER = "clickhouse";
3133

34+
/**
35+
* Data format option.
36+
*/
37+
public static final Option<String> FORMAT = Option.valueOf(ClickHouseClientOption.FORMAT.getKey());
38+
3239
private static final List<Option<?>> connQueryParams;
3340

3441
static {
3542
Set<Option<?>> allOptions = new LinkedHashSet<>();
43+
for (ClickHouseOption option : ClickHouseClientOption.values()) {
44+
allOptions.add(Option.valueOf(option.getKey()));
45+
}
3646
try {
37-
for (ClickHouseClient client : ServiceLoader.load(ClickHouseClient.class, ClickHouseConnectionFactoryProvider.class.getClassLoader())) {
47+
for (ClickHouseClient client : ServiceLoader.load(ClickHouseClient.class,
48+
ClickHouseConnectionFactoryProvider.class.getClassLoader())) {
3849
for (ClickHouseOption option : client.getOptionClass().getEnumConstants()) {
3950
allOptions.add(Option.valueOf(option.getKey()));
4051
}
@@ -52,7 +63,7 @@ private String getOptionValueAsString(ConnectionFactoryOptions cfOpt, Option<?>
5263

5364
private String getHosts(ConnectionFactoryOptions cfOpt) {
5465
String hosts = getOptionValueAsString(cfOpt, HOST, ClickHouseDefaults.HOST.getEffectiveDefaultValue());
55-
if (!hosts.contains(",") && !hosts.contains(":")){
66+
if (!hosts.contains(",") && !hosts.contains(":")) {
5667
return hosts + ":" + cfOpt.getValue(PORT);
5768
}
5869
return hosts;
@@ -62,7 +73,8 @@ private String getHosts(ConnectionFactoryOptions cfOpt) {
6273
public ConnectionFactory create(ConnectionFactoryOptions cfOpt) {
6374
String hosts = getHosts(cfOpt);
6475
String database = getOptionValueAsString(cfOpt, DATABASE, "");
65-
String protocol = getOptionValueAsString(cfOpt, PROTOCOL, ClickHouseProtocol.HTTP.name()).toLowerCase(Locale.ROOT);
76+
String protocol = getOptionValueAsString(cfOpt, PROTOCOL, ClickHouseProtocol.HTTP.name())
77+
.toLowerCase(Locale.ROOT);
6678
if (Boolean.parseBoolean(getOptionValueAsString(cfOpt, SSL, "false"))) {
6779
protocol += "s";
6880
}
@@ -74,21 +86,23 @@ public ConnectionFactory create(ConnectionFactoryOptions cfOpt) {
7486
}
7587
String user = getOptionValueAsString(cfOpt, USER, "");
7688
String password = getOptionValueAsString(cfOpt, PASSWORD, "");
77-
urlBuilder.append("?user=").append(ClickHouseUtils.encode(user)).append("&password=").append(ClickHouseUtils.encode(password));
89+
urlBuilder.append("?user=").append(ClickHouseUtils.encode(user)).append("&password=")
90+
.append(ClickHouseUtils.encode(password));
7891
for (Option<?> option : connQueryParams) {
7992
Object value = cfOpt.getValue(option);
8093
if (value != null) {
81-
urlBuilder.append('&').append(option.name()).append('=').append(ClickHouseUtils.encode(cfOpt.getValue(option).toString()));
94+
urlBuilder.append('&').append(option.name()).append('=')
95+
.append(ClickHouseUtils.encode(cfOpt.getValue(option).toString()));
8296
}
8397
}
84-
8598
ClickHouseNodes nodes = ClickHouseNodes.of(urlBuilder.toString());
8699
return new ClickHouseConnectionFactory(nodes);
87100
}
88101

89102
@Override
90103
public boolean supports(ConnectionFactoryOptions connectionFactoryOptions) {
91-
return connectionFactoryOptions.getValue(DRIVER).equals(CLICKHOUSE_DRIVER);
104+
String driverIdentifier = Objects.toString(connectionFactoryOptions.getValue(DRIVER));
105+
return "ch".equalsIgnoreCase(driverIdentifier) || CLICKHOUSE_DRIVER.equalsIgnoreCase(driverIdentifier);
92106
}
93107

94108
@Override
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.clickhouse.r2dbc;
2+
3+
import org.junit.AfterClass;
4+
import org.junit.BeforeClass;
5+
import org.junit.jupiter.api.AfterAll;
6+
import org.junit.jupiter.api.BeforeAll;
7+
8+
import com.clickhouse.client.BaseIntegrationTest;
9+
import com.clickhouse.client.ClickHouseProtocol;
10+
import com.clickhouse.client.ClickHouseServerForTest;
11+
12+
import io.r2dbc.spi.ConnectionFactories;
13+
import io.r2dbc.spi.ConnectionFactory;
14+
15+
public abstract class BaseR2dbcTest extends BaseIntegrationTest {
16+
protected static final String CUSTOM_PROTOCOL_NAME = System.getProperty("protocol", "http").toUpperCase();
17+
protected static final ClickHouseProtocol DEFAULT_PROTOCOL = ClickHouseProtocol
18+
.valueOf(CUSTOM_PROTOCOL_NAME.indexOf("HTTP") >= 0 ? "HTTP" : CUSTOM_PROTOCOL_NAME);
19+
protected static final String EXTRA_PARAM = CUSTOM_PROTOCOL_NAME.indexOf("HTTP") >= 0
20+
&& !"HTTP".equals(CUSTOM_PROTOCOL_NAME) ? "http_connection_provider=" + CUSTOM_PROTOCOL_NAME : "";
21+
22+
@BeforeAll
23+
@BeforeClass
24+
public static void beforeSuite() throws Exception {
25+
ClickHouseServerForTest.beforeSuite();
26+
}
27+
28+
@AfterAll
29+
@AfterClass
30+
public static void afterSuite() throws Exception {
31+
ClickHouseServerForTest.afterSuite();
32+
}
33+
34+
protected ConnectionFactory getConnectionFactory(ClickHouseProtocol protocol, String... parameters) {
35+
StringBuilder builder = new StringBuilder(getServer(protocol).toUri("r2dbc:ch:").toString());
36+
for (String queryString : parameters) {
37+
if (queryString != null && !queryString.isEmpty()) {
38+
if (queryString.charAt(0) != '&') {
39+
builder.append('&');
40+
}
41+
builder.append(queryString);
42+
}
43+
}
44+
ConnectionFactory connectionFactory = ConnectionFactories.get(builder.toString());
45+
return connectionFactory;
46+
}
47+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.clickhouse.r2dbc.connection;
2+
3+
import java.util.concurrent.atomic.AtomicReference;
4+
5+
import org.junit.Assert;
6+
import org.junit.Test;
7+
8+
import com.clickhouse.r2dbc.BaseR2dbcTest;
9+
10+
import io.r2dbc.spi.ConnectionFactory;
11+
import reactor.core.publisher.Mono;
12+
13+
public class ClickHouseConnectionTest extends BaseR2dbcTest {
14+
protected void executeQuery(String sql, String expectedStringResults, String customConnStr) {
15+
ConnectionFactory connectionFactory = getConnectionFactory(DEFAULT_PROTOCOL, EXTRA_PARAM, customConnStr);
16+
17+
AtomicReference<Throwable> error = new AtomicReference<>();
18+
StringBuilder builder = new StringBuilder(64);
19+
Mono.from(connectionFactory.create())
20+
.flatMapMany(connection -> connection.createStatement(sql).execute())
21+
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0)))
22+
.doOnNext(result -> builder.append(result))
23+
.doOnError(error::set)
24+
.then()
25+
.block();
26+
27+
Assert.assertNull("Should not run into error", error.get());
28+
Assert.assertEquals(expectedStringResults, builder.toString());
29+
}
30+
31+
// @Test
32+
public void testQuery() throws Exception {
33+
String sql = "SELECT * FROM numbers(10)";
34+
String expected = "0123456789";
35+
executeQuery(sql, expected, null);
36+
executeQuery(sql, expected, "format=CSV");
37+
executeQuery(sql, expected, "format=CSV&max_result_rows=11");
38+
}
39+
}

0 commit comments

Comments
 (0)