Skip to content

Commit c22cdc1

Browse files
committed
file-based data loading and dumping
1 parent 0e9b519 commit c22cdc1

36 files changed

+883
-110
lines changed

Diff for: clickhouse-client/src/main/java/com/clickhouse/client/AbstractClient.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ protected final ClickHouseNode getServer() {
7878

7979
/**
8080
* Checks if the underlying connection can be reused. In general, new connection
81-
* will be created when {@code connection} is null or {@code requestServer} is
82-
* different from {@code currentServer} - the existing connection will be closed
83-
* in the later case.
81+
* will be created when {@code connection} is {@code null} or
82+
* {@code requestServer} is different from {@code currentServer} - the existing
83+
* connection will be closed in the later case.
8484
*
8585
* @param connection existing connection which may or may not be null
8686
* @param requestServer non-null requested server, returned from previous call

Diff for: clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseChecker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public static BigDecimal between(BigDecimal value, String name, BigDecimal minVa
189189
}
190190

191191
/**
192-
* Checks if the given string is null or empty.
192+
* Checks if the given string is {@code null} or empty.
193193
*
194194
* @param value the string to check
195195
* @return true if the string is null or empty; false otherwise

Diff for: clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java

+72-9
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,46 @@ static <T> CompletableFuture<T> submit(Callable<T> task) {
252252
}
253253
}
254254

255+
/**
256+
* Dumps a table or query result from server into a file. File will be
257+
* created/overwrited as needed.
258+
*
259+
* @param server non-null server to connect to
260+
* @param tableOrQuery table name or a select query
261+
* @param file output file
262+
* @return non-null future object to get result
263+
* @throws IllegalArgumentException if any of server, tableOrQuery, and output
264+
* is null
265+
* @throws CompletionException when error occurred during execution
266+
*/
267+
static CompletableFuture<ClickHouseResponseSummary> dump(ClickHouseNode server, String tableOrQuery,
268+
ClickHouseFile file) {
269+
if (server == null || tableOrQuery == null || file == null) {
270+
throw new IllegalArgumentException("Non-null server, tableOrQuery, and file are required");
271+
}
272+
273+
// in case the protocol is ANY
274+
final ClickHouseNode theServer = ClickHouseCluster.probe(server);
275+
276+
final String theQuery = tableOrQuery.trim();
277+
278+
return submit(() -> {
279+
try (ClickHouseClient client = newInstance(theServer.getProtocol())) {
280+
ClickHouseRequest<?> request = client.connect(theServer).output(file);
281+
// FIXME what if the table name is `try me`?
282+
if (theQuery.indexOf(' ') < 0) {
283+
request.table(theQuery);
284+
} else {
285+
request.query(theQuery);
286+
}
287+
288+
try (ClickHouseResponse response = request.executeAndWait()) {
289+
return response.getSummary();
290+
}
291+
}
292+
});
293+
}
294+
255295
/**
256296
* Dumps a table or query result from server into a file. File will be
257297
* created/overwrited as needed.
@@ -302,8 +342,8 @@ static CompletableFuture<ClickHouseResponseSummary> dump(ClickHouseNode server,
302342

303343
return submit(() -> {
304344
try (ClickHouseClient client = newInstance(theServer.getProtocol())) {
305-
ClickHouseRequest<?> request = client.connect(theServer).compressServerResponse(
306-
compression != null && compression != ClickHouseCompression.NONE, compression).format(format);
345+
ClickHouseRequest<?> request = client.connect(theServer).compressServerResponse(compression)
346+
.format(format).output(output);
307347
// FIXME what if the table name is `try me`?
308348
if (theQuery.indexOf(' ') < 0) {
309349
request.table(theQuery);
@@ -312,7 +352,6 @@ static CompletableFuture<ClickHouseResponseSummary> dump(ClickHouseNode server,
312352
}
313353

314354
try (ClickHouseResponse response = request.executeAndWait()) {
315-
response.pipe(output, request.getConfig().getWriteBufferSize());
316355
return response.getSummary();
317356
}
318357
} finally {
@@ -325,6 +364,33 @@ static CompletableFuture<ClickHouseResponseSummary> dump(ClickHouseNode server,
325364
});
326365
}
327366

367+
/**
368+
* Loads data from given file into a table.
369+
*
370+
* @param server non-null server to connect to
371+
* @param table non-null target table
372+
* @param file non-null file
373+
* @return future object to get result
374+
* @throws IllegalArgumentException if any of server, table, and input is null
375+
* @throws CompletionException when error occurred during execution
376+
*/
377+
static CompletableFuture<ClickHouseResponseSummary> load(ClickHouseNode server, String table, ClickHouseFile file) {
378+
if (server == null || table == null || file == null) {
379+
throw new IllegalArgumentException("Non-null server, table, and file are required");
380+
}
381+
382+
// in case the protocol is ANY
383+
final ClickHouseNode theServer = ClickHouseCluster.probe(server);
384+
385+
return submit(() -> {
386+
try (ClickHouseClient client = newInstance(theServer.getProtocol());
387+
ClickHouseResponse response = client.connect(theServer).write().table(table).data(file)
388+
.executeAndWait()) {
389+
return response.getSummary();
390+
}
391+
});
392+
}
393+
328394
/**
329395
* Loads data from a file into table using specified format and compression
330396
* algorithm.
@@ -376,9 +442,8 @@ static CompletableFuture<ClickHouseResponseSummary> load(ClickHouseNode server,
376442
.createPipedOutputStream(client.getConfig(), null);
377443
// execute query in a separate thread(because async is explicitly set to true)
378444
CompletableFuture<ClickHouseResponse> future = client.connect(theServer).write().table(table)
379-
.decompressClientRequest(compression != null && compression != ClickHouseCompression.NONE,
380-
compression)
381-
.format(format).data(input = stream.getInputStream()).execute();
445+
.decompressClientRequest(compression).format(format).data(input = stream.getInputStream())
446+
.execute();
382447
try {
383448
// write data into stream in current thread
384449
writer.write(stream);
@@ -434,9 +499,7 @@ static CompletableFuture<ClickHouseResponseSummary> load(ClickHouseNode server,
434499
return submit(() -> {
435500
try (ClickHouseClient client = newInstance(theServer.getProtocol());
436501
ClickHouseResponse response = client.connect(theServer).write().table(table)
437-
.decompressClientRequest(compression != null && compression != ClickHouseCompression.NONE,
438-
compression)
439-
.format(format).data(input).executeAndWait()) {
502+
.decompressClientRequest(compression).format(format).data(input).executeAndWait()) {
440503
return response.getSummary();
441504
} finally {
442505
try {

Diff for: clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ public ClickHouseClient build() {
9090

9191
boolean noSelector = nodeSelector == null || nodeSelector == ClickHouseNodeSelector.EMPTY;
9292
int counter = 0;
93+
ClickHouseConfig conf = getConfig();
9394
for (ClickHouseClient c : ServiceLoader.load(ClickHouseClient.class, getClass().getClassLoader())) {
95+
c.init(conf);
96+
9497
counter++;
9598
if (noSelector || nodeSelector.match(c)) {
9699
client = c;
@@ -101,8 +104,6 @@ public ClickHouseClient build() {
101104
if (client == null) {
102105
throw new IllegalStateException(
103106
ClickHouseUtils.format("No suitable ClickHouse client(out of %d) found in classpath.", counter));
104-
} else {
105-
client.init(getConfig());
106107
}
107108

108109
return client;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package com.clickhouse.client;
2+
3+
import java.io.File;
4+
import java.io.FileInputStream;
5+
import java.io.FileNotFoundException;
6+
import java.io.FileOutputStream;
7+
import java.nio.file.Path;
8+
9+
import com.clickhouse.client.config.ClickHouseClientOption;
10+
11+
/**
12+
* Wrapper of {@link java.io.File} with additional information like compression
13+
* and format.
14+
*/
15+
public class ClickHouseFile {
16+
/**
17+
* Null file which has no compression and format.
18+
*/
19+
public static final ClickHouseFile NULL = new ClickHouseFile(null, ClickHouseCompression.NONE, 0, null);
20+
21+
public static ClickHouseFile of(File file) {
22+
return of(file, null, 0, null);
23+
}
24+
25+
public static ClickHouseFile of(Path path) {
26+
return of(ClickHouseChecker.nonNull(path, "Path").toFile(), null, 0, null);
27+
}
28+
29+
public static ClickHouseFile of(String file) {
30+
return of(new File(ClickHouseChecker.nonEmpty(file, "File")), null, 0, null);
31+
}
32+
33+
public static ClickHouseFile of(String file, ClickHouseCompression compression, int compressionLevel,
34+
ClickHouseFormat format) {
35+
return of(new File(ClickHouseChecker.nonEmpty(file, "File")), compression, compressionLevel, format);
36+
}
37+
38+
public static ClickHouseFile of(File file, ClickHouseCompression compression, int compressionLevel,
39+
ClickHouseFormat format) {
40+
return new ClickHouseFile(ClickHouseChecker.nonNull(file, "File"),
41+
compression != null ? compression : ClickHouseCompression.fromFileName(file.getName()),
42+
compressionLevel < 1 ? 0 : compressionLevel,
43+
format != null ? format : ClickHouseFormat.fromFileName(file.getName()));
44+
}
45+
46+
private final File file;
47+
private final ClickHouseCompression compress;
48+
private final int compressLevel;
49+
private final ClickHouseFormat format;
50+
51+
protected ClickHouseFile(File file, ClickHouseCompression compress, int compressLevel, ClickHouseFormat format) {
52+
this.file = file;
53+
this.compress = compress;
54+
this.compressLevel = compressLevel;
55+
this.format = format;
56+
}
57+
58+
/**
59+
* Creates an input stream for reading the file.
60+
*
61+
* @return non-null input stream for reading the file
62+
*/
63+
public ClickHouseInputStream asInputStream() {
64+
if (!isAvailable()) {
65+
return ClickHouseInputStream.empty();
66+
}
67+
68+
try {
69+
return ClickHouseInputStream.wrap(this, new FileInputStream(getFile()),
70+
(int) ClickHouseClientOption.READ_BUFFER_SIZE.getDefaultValue(), null,
71+
getCompressionAlgorithm(), getCompressionLevel());
72+
} catch (FileNotFoundException e) {
73+
throw new IllegalArgumentException(e);
74+
}
75+
}
76+
77+
/**
78+
* Creates an output stream for writing data into the file.
79+
*
80+
* @return non-null input stream for writing data into the file
81+
*/
82+
public ClickHouseOutputStream asOutputStream() {
83+
if (!isAvailable()) {
84+
return ClickHouseOutputStream.empty();
85+
}
86+
87+
try {
88+
return ClickHouseOutputStream.wrap(this, new FileOutputStream(getFile()),
89+
(int) ClickHouseClientOption.WRITE_BUFFER_SIZE.getDefaultValue(), null,
90+
getCompressionAlgorithm(), getCompressionLevel());
91+
} catch (FileNotFoundException e) {
92+
throw new IllegalArgumentException(e);
93+
}
94+
}
95+
96+
/**
97+
* Gets file, which only works when {@link #isAvailable()} returns {@code true}.
98+
*
99+
* @return non-null file, except {@code null} for {@link #NULL}
100+
*/
101+
public File getFile() {
102+
return file;
103+
}
104+
105+
/**
106+
* Gets file format, which could be null. Use {@link #hasFormat()} to check
107+
* first.
108+
*
109+
* @return file format, could be null
110+
*/
111+
public ClickHouseFormat getFormat() {
112+
return format;
113+
}
114+
115+
/**
116+
* Gets compression algorithm.
117+
*
118+
* @return non-null compression algorithm
119+
*/
120+
public ClickHouseCompression getCompressionAlgorithm() {
121+
return compress;
122+
}
123+
124+
/**
125+
* Gets compression level.
126+
*
127+
* @return compression level, which is always greater than or equal to zero
128+
*/
129+
public int getCompressionLevel() {
130+
return compressLevel;
131+
}
132+
133+
/**
134+
* Checks if the file format is defined or not.
135+
*
136+
* @return true if the file format is defined; false otherwise
137+
*/
138+
public boolean hasFormat() {
139+
return format != null;
140+
}
141+
142+
/**
143+
* Checks if the file is available or not.
144+
*
145+
* @return true if the file is available; false otherwise
146+
*/
147+
public boolean isAvailable() {
148+
return file != null && file.exists();
149+
}
150+
151+
/**
152+
* Checks if the file is compressed or not.
153+
*
154+
* @return true if the file is compressed; false otherwise
155+
*/
156+
public boolean isCompressed() {
157+
return compress != ClickHouseCompression.NONE;
158+
}
159+
}

Diff for: clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFormat.java

+55
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,61 @@ public enum ClickHouseFormat {
6868
Vertical(false, true, false, false, false), // https://clickhouse.com/docs/en/interfaces/formats/#vertical
6969
XML(false, true, false, false, false); // https://clickhouse.com/docs/en/interfaces/formats/#xml
7070

71+
/**
72+
* Gets format based on given file name.
73+
*
74+
* @param file file name
75+
* @return format, could be null
76+
*/
77+
public static ClickHouseFormat fromFileName(String file) {
78+
ClickHouseCompression compression = ClickHouseCompression.fromFileName(file);
79+
if (compression != ClickHouseCompression.NONE) {
80+
file = file.substring(0, file.lastIndexOf('.'));
81+
}
82+
ClickHouseFormat format = null;
83+
84+
int index = 0;
85+
if (file != null && (index = file.lastIndexOf('.')) > 0) {
86+
String ext = file.substring(index + 1).toLowerCase();
87+
switch (ext) {
88+
case "arrow":
89+
format = Arrow;
90+
break;
91+
case "avro":
92+
format = Avro;
93+
break;
94+
case "capnp":
95+
format = CapnProto;
96+
break;
97+
case "csv":
98+
format = CSV;
99+
break;
100+
case "json":
101+
format = JSONEachRow;
102+
break;
103+
case "msgpack":
104+
format = MsgPack;
105+
break;
106+
case "orc":
107+
format = ORC;
108+
break;
109+
case "parquet":
110+
format = Parquet;
111+
break;
112+
case "tsv":
113+
format = TSV;
114+
break;
115+
case "xml":
116+
format = XML;
117+
break;
118+
default:
119+
break;
120+
}
121+
}
122+
123+
return format;
124+
}
125+
71126
private final boolean input;
72127
private final boolean output;
73128
private final boolean binary;

0 commit comments

Comments
 (0)