Skip to content

Commit d3d8089

Browse files
authored
Merge pull request #963 from zhicwu/fit-and-finish
Prepare 0.3.2-patch10 release
2 parents 75ddd37 + 5a1c464 commit d3d8089

File tree

11 files changed

+369
-146
lines changed

11 files changed

+369
-146
lines changed

README.md

Lines changed: 65 additions & 107 deletions
Large diffs are not rendered by default.

clickhouse-cli-client/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ Either [clickhouse-client](https://clickhouse.com/docs/en/interfaces/cli/) or [d
1818

1919
```xml
2020
<dependency>
21-
<!-- will stop using ru.yandex.clickhouse starting from 0.4.0 -->
21+
<!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
2222
<groupId>com.clickhouse</groupId>
2323
<artifactId>clickhouse-cli-client</artifactId>
24-
<version>0.3.2-patch9</version>
24+
<version>0.3.2-patch10</version>
2525
</dependency>
2626
```
2727

clickhouse-client/README.md

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,55 @@
11
# ClickHouse Java Client
22

3-
Async Java client for ClickHouse. `clickhouse-client` is an abstract module, so it does not work by itself until being used together with an implementation like `clickhouse-grpc-client` or `clickhouse-http-client`.
3+
Async Java client for ClickHouse. `clickhouse-client` is an abstract module, so it does not work by itself until being used together with an implementation like `clickhouse-http-client`, `clickhouse-grpc-client` or `clickhouse-cli-client`.
44

5+
## Configuration
6+
7+
You can pass any client option([common](https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java), [http](https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java), [grpc](https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/config/ClickHouseGrpcOption.java), and [cli](https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/config/ClickHouseCommandLineOption.java)) to `ClickHouseRequest.option()` and [server setting](https://clickhouse.com/docs/en/operations/settings/) to `ClickHouseRequest.set()` before execution, for instance:
8+
9+
```java
10+
client.connect("http://localhost/system")
11+
.query("select 1")
12+
// short version of option(ClickHouseClientOption.FORMAT, ClickHouseFormat.RowBinaryWithNamesAndTypes)
13+
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
14+
.option(ClickHouseClientOption.SOCKET_TIMEOUT, 30000 * 2) // 60 seconds
15+
.set("max_rows_to_read", 100)
16+
.set("read_overflow_mode", "throw")
17+
.execute()
18+
.whenComplete((response, throwable) -> {
19+
if (throwable != null) {
20+
log.error("Unexpected error", throwable);
21+
} else {
22+
try {
23+
for (ClickHouseRecord rec : response.records()) {
24+
// ...
25+
}
26+
} finally {
27+
response.close();
28+
}
29+
}
30+
});
31+
```
32+
33+
[Default value](https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseDefaults.java) can be either configured via system property or environment variable.
534

635
## Quick Start
736

837
```xml
938
<dependency>
1039
<groupId>com.clickhouse</groupId>
1140
<artifactId>clickhouse-http-client</artifactId>
12-
<version>0.3.2-patch9</version>
41+
<version>0.3.2-patch10</version>
1342
</dependency>
1443
```
1544

1645
```java
17-
// declare a server to connect to
18-
ClickHouseNode server = ClickHouseNode.of("server1.domain", ClickHouseProtocol.HTTP, 8123, "my_db");
46+
// declare a list of servers to connect to
47+
ClickHouseNodes servers = ClickHouseNodes.of(
48+
"jdbc:ch:http://server1.domain,server2.domain,server3.domain/my_db"
49+
+ "?load_balancing_policy=random&health_check_interval=5000&failover=2");
1950

2051
// execute multiple queries in a worker thread one after another within same session
21-
CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.send(server,
52+
CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.send(servers.get(),
2253
"create database if not exists test",
2354
"use test", // change current database from my_db to test
2455
"create table if not exists test_table(s String) engine=Memory",
@@ -30,20 +61,18 @@ CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.sen
3061
// block current thread until queries completed, and then retrieve summaries
3162
// List<ClickHouseResponseSummary> results = future.get();
3263

33-
try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) {
34-
ClickHouseRequest<?> request = client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
64+
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
65+
ClickHouseRequest<?> request = client.connect(servers).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
3566
// load data into a table and wait until it's completed
36-
request.write().query("insert into my_table select c2, c3 from input('c1 UInt8, c2 String, c3 Int32')")
67+
request.write()
68+
.query("insert into my_table select c2, c3 from input('c1 UInt8, c2 String, c3 Int32')")
3769
.data(myInputStream).execute().thenAccept(response -> {
3870
response.close();
3971
});
4072

4173
// query with named parameter
42-
try (ClickHouseResponse response = request.query(
43-
ClickHouseParameterizedQuery.of(
44-
request.getConfig(),
45-
"select * from numbers(:limit)")
46-
).params(100000).executeAndWait()) {
74+
try (ClickHouseResponse response = request.query(ClickHouseParameterizedQuery.of(
75+
request.getConfig(), "select * from numbers(:limit)")).params(100000).executeAndWait()) {
4776
for (ClickHouseRecord r : response.records()) {
4877
// Don't cache ClickHouseValue / ClickHouseRecord as they're reused for
4978
// corresponding column / row

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ public static ClickHouseNode of(URI uri, ClickHouseNode template) {
801801
}
802802
if (protocol != ClickHouseProtocol.POSTGRESQL && scheme.charAt(scheme.length() - 1) == 's') {
803803
params.put(ClickHouseClientOption.SSL.getKey(), Boolean.TRUE.toString());
804-
params.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.NONE.name());
804+
params.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
805805
}
806806

807807
ClickHouseCredentials credentials = template.credentials;

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.clickhouse.client;
22

3+
import java.io.IOException;
34
import java.io.InputStream;
45
import java.io.OutputStream;
56
import java.io.Serializable;
7+
import java.io.UncheckedIOException;
68
import java.util.ArrayList;
79
import java.util.Collection;
810
import java.util.Collections;
@@ -14,8 +16,10 @@
1416
import java.util.Map;
1517
import java.util.Objects;
1618
import java.util.Map.Entry;
19+
import java.util.concurrent.CancellationException;
1720
import java.util.concurrent.CompletableFuture;
1821
import java.util.concurrent.CompletionException;
22+
import java.util.concurrent.ExecutionException;
1923
import java.util.concurrent.atomic.AtomicReference;
2024
import java.util.Optional;
2125
import java.util.Properties;
@@ -39,6 +43,8 @@ public class ClickHouseRequest<SelfT extends ClickHouseRequest<SelfT>> implement
3943
* Mutation request.
4044
*/
4145
public static class Mutation extends ClickHouseRequest<Mutation> {
46+
private ClickHouseWriter writer;
47+
4248
protected Mutation(ClickHouseRequest<?> request, boolean sealed) {
4349
super(request.getClient(), request.server, request.serverRef, request.options, sealed);
4450
this.settings.putAll(request.settings);
@@ -105,6 +111,20 @@ public Mutation format(ClickHouseFormat format) {
105111
return super.format(format);
106112
}
107113

114+
/**
115+
* Sets custom writer for streaming. This will create a piped stream between the
116+
* writer and ClickHouse server.
117+
*
118+
* @param writer writer
119+
* @return mutation request
120+
*/
121+
public Mutation data(ClickHouseWriter writer) {
122+
checkSealed();
123+
124+
this.writer = changeProperty(PROP_WRITER, this.writer, writer);
125+
return this;
126+
}
127+
108128
/**
109129
* Loads data from given file which may or may not be compressed.
110130
*
@@ -197,6 +217,70 @@ public Mutation data(ClickHouseDeferredValue<ClickHouseInputStream> input) {
197217
return this;
198218
}
199219

220+
@Override
221+
public CompletableFuture<ClickHouseResponse> execute() {
222+
if (writer != null) {
223+
ClickHouseConfig c = getConfig();
224+
ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
225+
.createPipedOutputStream(c, null);
226+
data(stream.getInputStream());
227+
CompletableFuture<ClickHouseResponse> future = null;
228+
if (c.isAsync()) {
229+
future = getClient().execute(isSealed() ? this : seal());
230+
}
231+
try (ClickHouseOutputStream out = stream) {
232+
writer.write(out);
233+
} catch (IOException e) {
234+
throw new CompletionException(e);
235+
}
236+
if (future != null) {
237+
return future;
238+
}
239+
}
240+
241+
return getClient().execute(isSealed() ? this : seal());
242+
}
243+
244+
@Override
245+
public ClickHouseResponse executeAndWait() throws ClickHouseException {
246+
if (writer != null) {
247+
ClickHouseConfig c = getConfig();
248+
ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
249+
.createPipedOutputStream(c, null);
250+
data(stream.getInputStream());
251+
CompletableFuture<ClickHouseResponse> future = null;
252+
if (c.isAsync()) {
253+
future = getClient().execute(isSealed() ? this : seal());
254+
}
255+
try (ClickHouseOutputStream out = stream) {
256+
writer.write(out);
257+
} catch (IOException e) {
258+
throw ClickHouseException.of(e, getServer());
259+
}
260+
if (future != null) {
261+
try {
262+
return future.get();
263+
} catch (InterruptedException e) {
264+
Thread.currentThread().interrupt();
265+
throw ClickHouseException.forCancellation(e, getServer());
266+
} catch (CancellationException e) {
267+
throw ClickHouseException.forCancellation(e, getServer());
268+
} catch (ExecutionException | UncheckedIOException e) {
269+
Throwable cause = e.getCause();
270+
if (cause == null) {
271+
cause = e;
272+
}
273+
throw cause instanceof ClickHouseException ? (ClickHouseException) cause
274+
: ClickHouseException.of(cause, getServer());
275+
} catch (RuntimeException e) { // unexpected
276+
throw ClickHouseException.of(e, getServer());
277+
}
278+
}
279+
}
280+
281+
return getClient().executeAndWait(isSealed() ? this : seal());
282+
}
283+
200284
/**
201285
* Sends mutation requets for execution. Same as
202286
* {@code client.execute(request.seal())}.
@@ -256,6 +340,7 @@ public Mutation seal() {
256340
static final String PROP_PREPARED_QUERY = "preparedQuery";
257341
static final String PROP_QUERY = "query";
258342
static final String PROP_QUERY_ID = "queryId";
343+
static final String PROP_WRITER = "writer";
259344

260345
private final boolean sealed;
261346

clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseNodeTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public void testInvalidNodes() {
169169
public void testValidNodes() {
170170
Map<String, String> options = new HashMap<>();
171171
options.put(ClickHouseClientOption.SSL.getKey(), "false");
172-
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
172+
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
173173
options.put(ClickHouseClientOption.DATABASE.getKey(), "db1");
174174

175175
Set<String> tags = new HashSet<>();
@@ -183,7 +183,7 @@ public void testValidNodes() {
183183
public void testSecureNode() {
184184
Map<String, String> options = new HashMap<>();
185185
options.put(ClickHouseClientOption.SSL.getKey(), "true");
186-
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
186+
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
187187
options.put(ClickHouseClientOption.DATABASE.getKey(), "db1");
188188

189189
Assert.assertEquals(ClickHouseNode.of("https://node1:443/db1"),
@@ -218,7 +218,7 @@ public void testSingleWordNode() {
218218
public void testNodeWithProtocol() {
219219
Map<String, String> options = new HashMap<>();
220220
options.put(ClickHouseClientOption.SSL.getKey(), "true");
221-
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
221+
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
222222

223223
for (ClickHouseProtocol p : ClickHouseProtocol.values()) {
224224
Assert.assertEquals(ClickHouseNode.of(p.name() + ":///?#"),
@@ -254,7 +254,7 @@ public void testNodeWithHostAndPort() {
254254
public void testNodeWithDatabase() {
255255
Map<String, String> options = new HashMap<>();
256256
options.put(ClickHouseClientOption.SSL.getKey(), "true");
257-
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
257+
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
258258

259259
Assert.assertEquals(ClickHouseNode.of("grpcs://node1:19100/"),
260260
new ClickHouseNode("node1", ClickHouseProtocol.GRPC, 19100, null, options, null));
@@ -324,13 +324,13 @@ public void testNodeWithOptions() {
324324
Map<String, String> options = new HashMap<>();
325325
options.put(ClickHouseClientOption.ASYNC.getKey(), "false");
326326
options.put(ClickHouseClientOption.SSL.getKey(), "true");
327-
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
327+
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
328328
options.put(ClickHouseClientOption.CONNECTION_TIMEOUT.getKey(), "500");
329329

330330
for (String uri : new String[] {
331331
"https://node1?!async&ssl&connect_timeout=500",
332-
"http://node1?async=false&ssl=true&sslmode=NONE&connect_timeout=500",
333-
"http://node1?&&&&async=false&ssl&&&&&sslmode=NONE&connect_timeout=500&&&",
332+
"http://node1?async=false&ssl=true&sslmode=STRICT&connect_timeout=500",
333+
"http://node1?&&&&async=false&ssl&&&&&sslmode=STRICT&connect_timeout=500&&&",
334334
}) {
335335
Assert.assertEquals(ClickHouseNode.of(uri),
336336
new ClickHouseNode("node1", ClickHouseProtocol.HTTP,
@@ -379,7 +379,7 @@ public void testQueryWithSlash() throws Exception {
379379
Assert.assertEquals(server.toUri(), new URI("http://localhost:1234?/a/b/c=d"));
380380

381381
Assert.assertEquals(ClickHouseNode.of("https://myserver/db/1/2/3?a%20=%201&b=/root/my.crt").toUri(),
382-
new URI("http://myserver:8443/db/1/2/3?ssl=true&sslmode=NONE&a%20=%201&b=/root/my.crt"));
382+
new URI("http://myserver:8443/db/1/2/3?ssl=true&sslmode=STRICT&a%20=%201&b=/root/my.crt"));
383383
}
384384

385385
@Test(groups = { "integration" })

clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseNodesTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.clickhouse.client.ClickHouseNode.Status;
1414
import com.clickhouse.client.config.ClickHouseClientOption;
1515
import com.clickhouse.client.config.ClickHouseDefaults;
16+
import com.clickhouse.client.config.ClickHouseSslMode;
1617

1718
import org.testng.Assert;
1819
import org.testng.annotations.Test;
@@ -213,7 +214,7 @@ public void testSingleNodeList() {
213214

214215
Map<String, String> options = new HashMap<>();
215216
options.put(ClickHouseClientOption.SSL.getKey(), "true");
216-
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
217+
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
217218
options.put(ClickHouseClientOption.DATABASE.getKey(), "db1");
218219

219220
Assert.assertEquals(ClickHouseNodes.of("https://node1:443/db1").nodes.get(0),

clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.UUID;
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.atomic.AtomicInteger;
2425
import java.util.stream.Collectors;
2526

2627
import com.clickhouse.client.ClickHouseClientBuilder.Agent;
@@ -909,6 +910,46 @@ public void testCustomRead() throws Exception {
909910
Assert.assertEquals(count, 1000L);
910911
}
911912

913+
@Test(groups = { "integration" })
914+
public void testCustomWriter() throws Exception {
915+
ClickHouseNode server = getServer();
916+
ClickHouseClient.send(server, "drop table if exists test_custom_writer",
917+
"create table test_custom_writer(a Int8) engine=Memory")
918+
.get();
919+
920+
try (ClickHouseClient client = getClient()) {
921+
AtomicInteger i = new AtomicInteger(1);
922+
ClickHouseRequest.Mutation req = client.connect(server).write().format(ClickHouseFormat.RowBinary)
923+
.table("test_custom_writer").data(o -> {
924+
o.write(i.getAndIncrement());
925+
});
926+
for (boolean b : new boolean[] { true, false }) {
927+
req.option(ClickHouseClientOption.ASYNC, b);
928+
929+
try (ClickHouseResponse resp = req.send().get()) {
930+
Assert.assertNotNull(resp);
931+
}
932+
933+
try (ClickHouseResponse resp = req.sendAndWait()) {
934+
Assert.assertNotNull(resp);
935+
}
936+
937+
try (ClickHouseResponse resp = req.execute().get()) {
938+
Assert.assertNotNull(resp);
939+
}
940+
941+
try (ClickHouseResponse resp = req.executeAndWait()) {
942+
Assert.assertNotNull(resp);
943+
}
944+
}
945+
946+
try (ClickHouseResponse resp = client.connect(server).query("select count(1) from test_custom_writer")
947+
.executeAndWait()) {
948+
Assert.assertEquals(resp.firstRecord().getValue(0).asInteger(), i.get() - 1);
949+
}
950+
}
951+
}
952+
912953
@Test(groups = { "integration" })
913954
public void testDumpAndLoadFile() throws Exception {
914955
// super.testLoadRawData();

0 commit comments

Comments
 (0)