Skip to content

Commit 3b48e07

Browse files
committed
support custom writer for streaming
1 parent a1a617c commit 3b48e07

File tree

2 files changed

+126
-0
lines changed

2 files changed

+126
-0
lines changed

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.clickhouse.client;
22

3+
import java.io.IOException;
34
import java.io.InputStream;
45
import java.io.OutputStream;
56
import java.io.Serializable;
7+
import java.io.UncheckedIOException;
68
import java.util.ArrayList;
79
import java.util.Collection;
810
import java.util.Collections;
@@ -14,8 +16,10 @@
1416
import java.util.Map;
1517
import java.util.Objects;
1618
import java.util.Map.Entry;
19+
import java.util.concurrent.CancellationException;
1720
import java.util.concurrent.CompletableFuture;
1821
import java.util.concurrent.CompletionException;
22+
import java.util.concurrent.ExecutionException;
1923
import java.util.concurrent.atomic.AtomicReference;
2024
import java.util.Optional;
2125
import java.util.Properties;
@@ -39,6 +43,8 @@ public class ClickHouseRequest<SelfT extends ClickHouseRequest<SelfT>> implement
3943
* Mutation request.
4044
*/
4145
public static class Mutation extends ClickHouseRequest<Mutation> {
46+
private ClickHouseWriter writer;
47+
4248
protected Mutation(ClickHouseRequest<?> request, boolean sealed) {
4349
super(request.getClient(), request.server, request.serverRef, request.options, sealed);
4450
this.settings.putAll(request.settings);
@@ -105,6 +111,20 @@ public Mutation format(ClickHouseFormat format) {
105111
return super.format(format);
106112
}
107113

114+
/**
115+
* Sets custom writer for streaming. This will create a piped stream between the
116+
* writer and ClickHouse server.
117+
*
118+
* @param writer writer
119+
* @return mutation request
120+
*/
121+
public Mutation data(ClickHouseWriter writer) {
122+
checkSealed();
123+
124+
this.writer = changeProperty(PROP_WRITER, this.writer, writer);
125+
return this;
126+
}
127+
108128
/**
109129
* Loads data from given file which may or may not be compressed.
110130
*
@@ -197,6 +217,70 @@ public Mutation data(ClickHouseDeferredValue<ClickHouseInputStream> input) {
197217
return this;
198218
}
199219

220+
@Override
221+
public CompletableFuture<ClickHouseResponse> execute() {
222+
if (writer != null) {
223+
ClickHouseConfig c = getConfig();
224+
ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
225+
.createPipedOutputStream(c, null);
226+
data(stream.getInputStream());
227+
CompletableFuture<ClickHouseResponse> future = null;
228+
if (c.isAsync()) {
229+
future = getClient().execute(isSealed() ? this : seal());
230+
}
231+
try (ClickHouseOutputStream out = stream) {
232+
writer.write(out);
233+
} catch (IOException e) {
234+
throw new CompletionException(e);
235+
}
236+
if (future != null) {
237+
return future;
238+
}
239+
}
240+
241+
return getClient().execute(isSealed() ? this : seal());
242+
}
243+
244+
@Override
245+
public ClickHouseResponse executeAndWait() throws ClickHouseException {
246+
if (writer != null) {
247+
ClickHouseConfig c = getConfig();
248+
ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
249+
.createPipedOutputStream(c, null);
250+
data(stream.getInputStream());
251+
CompletableFuture<ClickHouseResponse> future = null;
252+
if (c.isAsync()) {
253+
future = getClient().execute(isSealed() ? this : seal());
254+
}
255+
try (ClickHouseOutputStream out = stream) {
256+
writer.write(out);
257+
} catch (IOException e) {
258+
throw ClickHouseException.of(e, getServer());
259+
}
260+
if (future != null) {
261+
try {
262+
return future.get();
263+
} catch (InterruptedException e) {
264+
Thread.currentThread().interrupt();
265+
throw ClickHouseException.forCancellation(e, getServer());
266+
} catch (CancellationException e) {
267+
throw ClickHouseException.forCancellation(e, getServer());
268+
} catch (ExecutionException | UncheckedIOException e) {
269+
Throwable cause = e.getCause();
270+
if (cause == null) {
271+
cause = e;
272+
}
273+
throw cause instanceof ClickHouseException ? (ClickHouseException) cause
274+
: ClickHouseException.of(cause, getServer());
275+
} catch (RuntimeException e) { // unexpected
276+
throw ClickHouseException.of(e, getServer());
277+
}
278+
}
279+
}
280+
281+
return getClient().executeAndWait(isSealed() ? this : seal());
282+
}
283+
200284
/**
201285
* Sends mutation requets for execution. Same as
202286
* {@code client.execute(request.seal())}.
@@ -256,6 +340,7 @@ public Mutation seal() {
256340
static final String PROP_PREPARED_QUERY = "preparedQuery";
257341
static final String PROP_QUERY = "query";
258342
static final String PROP_QUERY_ID = "queryId";
343+
static final String PROP_WRITER = "writer";
259344

260345
private final boolean sealed;
261346

clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.UUID;
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.atomic.AtomicInteger;
2425
import java.util.stream.Collectors;
2526

2627
import com.clickhouse.client.ClickHouseClientBuilder.Agent;
@@ -909,6 +910,46 @@ public void testCustomRead() throws Exception {
909910
Assert.assertEquals(count, 1000L);
910911
}
911912

913+
@Test(groups = { "integration" })
914+
public void testCustomWriter() throws Exception {
915+
ClickHouseNode server = getServer();
916+
ClickHouseClient.send(server, "drop table if exists test_custom_writer",
917+
"create table test_custom_writer(a Int8) engine=Memory")
918+
.get();
919+
920+
try (ClickHouseClient client = getClient()) {
921+
AtomicInteger i = new AtomicInteger(1);
922+
ClickHouseRequest.Mutation req = client.connect(server).write().format(ClickHouseFormat.RowBinary)
923+
.table("test_custom_writer").data(o -> {
924+
o.write(i.getAndIncrement());
925+
});
926+
for (boolean b : new boolean[] { true, false }) {
927+
req.option(ClickHouseClientOption.ASYNC, b);
928+
929+
try (ClickHouseResponse resp = req.send().get()) {
930+
Assert.assertNotNull(resp);
931+
}
932+
933+
try (ClickHouseResponse resp = req.sendAndWait()) {
934+
Assert.assertNotNull(resp);
935+
}
936+
937+
try (ClickHouseResponse resp = req.execute().get()) {
938+
Assert.assertNotNull(resp);
939+
}
940+
941+
try (ClickHouseResponse resp = req.executeAndWait()) {
942+
Assert.assertNotNull(resp);
943+
}
944+
}
945+
946+
try (ClickHouseResponse resp = client.connect(server).query("select count(1) from test_custom_writer")
947+
.executeAndWait()) {
948+
Assert.assertEquals(resp.firstRecord().getValue(0).asInteger(), i.get() - 1);
949+
}
950+
}
951+
}
952+
912953
@Test(groups = { "integration" })
913954
public void testDumpAndLoadFile() throws Exception {
914955
// super.testLoadRawData();

0 commit comments

Comments
 (0)