Skip to content

Commit 7dcfae5

Browse files
authored
Merge pull request #1004 from zhicwu/develop
support direct upload/download in http client
2 parents b1b4cc1 + d1a97b1 commit 7dcfae5

File tree

8 files changed

+170
-32
lines changed

8 files changed

+170
-32
lines changed

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public ClickHouseInputStream asInputStream() {
6868
try {
6969
return ClickHouseInputStream.wrap(this, new FileInputStream(getFile()),
7070
(int) ClickHouseClientOption.READ_BUFFER_SIZE.getDefaultValue(), null,
71-
getCompressionAlgorithm(), getCompressionLevel());
71+
ClickHouseCompression.NONE, getCompressionLevel());
7272
} catch (FileNotFoundException e) {
7373
throw new IllegalArgumentException(e);
7474
}
@@ -87,7 +87,7 @@ public ClickHouseOutputStream asOutputStream() {
8787
try {
8888
return ClickHouseOutputStream.wrap(this, new FileOutputStream(getFile()),
8989
(int) ClickHouseClientOption.WRITE_BUFFER_SIZE.getDefaultValue(), null,
90-
getCompressionAlgorithm(), getCompressionLevel());
90+
ClickHouseCompression.NONE, getCompressionLevel());
9191
} catch (FileNotFoundException e) {
9292
throw new IllegalArgumentException(e);
9393
}

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public static ClickHouseInputStream of(ClickHouseFile file, int bufferSize, Runn
165165
}
166166
try {
167167
return wrap(file, new FileInputStream(file.getFile()), bufferSize, postCloseAction,
168-
file.getCompressionAlgorithm(), file.getCompressionLevel());
168+
ClickHouseCompression.NONE, file.getCompressionLevel());
169169
} catch (FileNotFoundException e) {
170170
throw new IllegalArgumentException(e);
171171
}

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public static ClickHouseOutputStream of(ClickHouseFile file, int bufferSize, Run
8787
}
8888
try {
8989
return wrap(file, new FileOutputStream(file.getFile()), bufferSize, postCloseAction,
90-
file.getCompressionAlgorithm(), file.getCompressionLevel());
90+
ClickHouseCompression.NONE, file.getCompressionLevel());
9191
} catch (FileNotFoundException e) {
9292
throw new IllegalArgumentException(e);
9393
}

clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

+118
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@
22

33
import java.io.BufferedReader;
44
import java.io.ByteArrayInputStream;
5+
import java.io.ByteArrayOutputStream;
6+
import java.io.File;
7+
import java.io.FileInputStream;
8+
import java.io.FileOutputStream;
59
import java.io.IOException;
610
import java.io.InputStream;
711
import java.io.InputStreamReader;
12+
import java.io.OutputStream;
813
import java.io.UncheckedIOException;
914
import java.math.BigDecimal;
1015
import java.math.BigInteger;
@@ -24,6 +29,8 @@
2429
import java.util.concurrent.ExecutionException;
2530
import java.util.concurrent.atomic.AtomicInteger;
2631
import java.util.stream.Collectors;
32+
import java.util.zip.GZIPInputStream;
33+
import java.util.zip.GZIPOutputStream;
2734

2835
import com.clickhouse.client.ClickHouseClientBuilder.Agent;
2936
import com.clickhouse.client.config.ClickHouseBufferingMode;
@@ -119,6 +126,16 @@ protected Object[][] getCompressionMatrix() {
119126
return array;
120127
}
121128

129+
@DataProvider(name = "fileProcessMatrix")
130+
protected Object[][] getFileProcessMatrix() {
131+
return new Object[][] {
132+
{ true, true },
133+
{ true, false },
134+
{ false, true },
135+
{ false, false },
136+
};
137+
}
138+
122139
@DataProvider(name = "renameMethods")
123140
protected Object[][] getRenameMethods() {
124141
return new Object[][] {
@@ -1035,6 +1052,47 @@ public void testDump() throws Exception {
10351052
Files.delete(temp);
10361053
}
10371054

1055+
@Test(dataProvider = "fileProcessMatrix", groups = "integration")
1056+
public void testDumpFile(boolean gzipCompressed, boolean useOneLiner) throws Exception {
1057+
ClickHouseNode server = getServer();
1058+
if (server.getProtocol() != ClickHouseProtocol.HTTP) {
1059+
throw new SkipException("Skip as only http implementation works well");
1060+
}
1061+
1062+
File file = File.createTempFile("chc", ".data");
1063+
ClickHouseFile wrappedFile = ClickHouseFile.of(file,
1064+
gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, 0,
1065+
ClickHouseFormat.CSV);
1066+
String query = "select number, if(number % 2 = 0, null, toString(number)) str from numbers(10)";
1067+
if (useOneLiner) {
1068+
ClickHouseClient.dump(server, query, wrappedFile).get();
1069+
} else {
1070+
try (ClickHouseClient client = getClient();
1071+
ClickHouseResponse response = client.connect(server).query(query).output(wrappedFile)
1072+
.executeAndWait()) {
1073+
// ignore
1074+
}
1075+
}
1076+
try (InputStream in = gzipCompressed ? new GZIPInputStream(new FileInputStream(file))
1077+
: new FileInputStream(file); ByteArrayOutputStream out = new ByteArrayOutputStream()) {
1078+
ClickHouseInputStream.pipe(in, out, 512);
1079+
String content = new String(out.toByteArray(), StandardCharsets.US_ASCII);
1080+
StringBuilder builder = new StringBuilder();
1081+
for (int i = 0; i < 10; i++) {
1082+
builder.append(i).append(',');
1083+
if (i % 2 == 0) {
1084+
builder.append("\\N");
1085+
} else {
1086+
builder.append('"').append(i).append('"');
1087+
}
1088+
builder.append('\n');
1089+
}
1090+
Assert.assertEquals(content, builder.toString());
1091+
} finally {
1092+
file.delete();
1093+
}
1094+
}
1095+
10381096
@Test(groups = { "integration" })
10391097
public void testCustomLoad() throws Exception {
10401098
ClickHouseNode server = getServer();
@@ -1117,6 +1175,66 @@ public void testLoadCsv() throws Exception {
11171175
}
11181176
}
11191177

1178+
@Test(dataProvider = "fileProcessMatrix", groups = "integration")
1179+
public void testLoadFile(boolean gzipCompressed, boolean useOneLiner) throws Exception {
1180+
ClickHouseNode server = getServer();
1181+
if (server.getProtocol() != ClickHouseProtocol.HTTP) {
1182+
throw new SkipException("Skip as only http implementation works well");
1183+
}
1184+
1185+
File file = File.createTempFile("chc", ".data");
1186+
Object[][] data = new Object[][] {
1187+
{ 1, "12345" },
1188+
{ 2, "23456" },
1189+
{ 3, "\\N" },
1190+
{ 4, "x" },
1191+
{ 5, "y" },
1192+
};
1193+
try (OutputStream out = gzipCompressed ? new GZIPOutputStream(new FileOutputStream(file))
1194+
: new FileOutputStream(file)) {
1195+
for (Object[] row : data) {
1196+
out.write((row[0] + "," + row[1]).getBytes(StandardCharsets.US_ASCII));
1197+
if ((int) row[0] != 5) {
1198+
out.write(10);
1199+
}
1200+
}
1201+
out.flush();
1202+
}
1203+
1204+
ClickHouseClient.send(server, "drop table if exists test_load_file",
1205+
"create table test_load_file(a Int32, b Nullable(String))engine=Memory").get();
1206+
ClickHouseFile wrappedFile = ClickHouseFile.of(file,
1207+
gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, 0,
1208+
ClickHouseFormat.CSV);
1209+
if (useOneLiner) {
1210+
ClickHouseClient
1211+
.load(server, "test_load_file", wrappedFile)
1212+
.get();
1213+
} else {
1214+
try (ClickHouseClient client = getClient();
1215+
ClickHouseResponse response = client.connect(server).write().table("test_load_file")
1216+
.data(wrappedFile).executeAndWait()) {
1217+
// ignore
1218+
}
1219+
}
1220+
try (ClickHouseClient client = getClient();
1221+
ClickHouseResponse response = client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
1222+
.query("select * from test_load_file order by a").executeAndWait()) {
1223+
int row = 0;
1224+
for (ClickHouseRecord r : response.records()) {
1225+
Assert.assertEquals(r.getValue(0).asObject(), data[row][0]);
1226+
if (row == 2) {
1227+
Assert.assertNull(r.getValue(1).asObject());
1228+
} else {
1229+
Assert.assertEquals(r.getValue(1).asObject(), data[row][1]);
1230+
}
1231+
row++;
1232+
}
1233+
} finally {
1234+
file.delete();
1235+
}
1236+
}
1237+
11201238
@Test(groups = { "integration" })
11211239
public void testLoadRawData() throws Exception {
11221240
ClickHouseNode server = getServer();

clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java

+18-10
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.clickhouse.client.http;
22

33
import java.io.IOException;
4-
import java.io.InputStream;
54
import java.io.UnsupportedEncodingException;
65
import java.net.URLEncoder;
76
import java.nio.charset.Charset;
@@ -17,6 +16,7 @@
1716
import com.clickhouse.client.ClickHouseCompression;
1817
import com.clickhouse.client.ClickHouseConfig;
1918
import com.clickhouse.client.ClickHouseCredentials;
19+
import com.clickhouse.client.ClickHouseInputStream;
2020
import com.clickhouse.client.ClickHouseNode;
2121
import com.clickhouse.client.ClickHouseOutputStream;
2222
import com.clickhouse.client.ClickHouseRequest;
@@ -59,15 +59,22 @@ static String buildQueryParams(ClickHouseRequest<?> request) {
5959
appendQueryParameter(builder, cp.getKey(), cp.getValue());
6060
}
6161

62-
if (config.isResponseCompressed()) {
63-
// request server to compress response
64-
appendQueryParameter(builder, "compress", "1");
65-
}
66-
if (config.isRequestCompressed()) {
62+
ClickHouseInputStream chIn = request.getInputStream().orElse(null);
63+
if (chIn != null && chIn.getUnderlyingFile().isAvailable()) {
64+
appendQueryParameter(builder, "query", request.getStatements().get(0));
65+
} else if (config.isRequestCompressed()) {
6766
// inform server that client's request is compressed
6867
appendQueryParameter(builder, "decompress", "1");
6968
}
7069

70+
ClickHouseOutputStream chOut = request.getOutputStream().orElse(null);
71+
if (chOut != null && chOut.getUnderlyingFile().isAvailable()) {
72+
appendQueryParameter(builder, "enable_http_compression", "1");
73+
} else if (config.isResponseCompressed()) {
74+
// request server to compress response
75+
appendQueryParameter(builder, "compress", "1");
76+
}
77+
7178
Map<String, Object> settings = request.getSettings();
7279
List<String> stmts = request.getStatements(false);
7380
String settingKey = "max_execution_time";
@@ -263,8 +270,9 @@ protected Map<String, String> mergeHeaders(Map<String, String> requestHeaders) {
263270
* @throws IOException when error occured posting request and/or server failed
264271
* to respond
265272
*/
266-
protected abstract ClickHouseHttpResponse post(String query, InputStream data, List<ClickHouseExternalTable> tables,
267-
String url, Map<String, String> headers, ClickHouseConfig config) throws IOException;
273+
protected abstract ClickHouseHttpResponse post(String query, ClickHouseInputStream data,
274+
List<ClickHouseExternalTable> tables, String url, Map<String, String> headers, ClickHouseConfig config)
275+
throws IOException;
268276

269277
/**
270278
* Checks whether the connection is reusable or not. This method will be called
@@ -296,11 +304,11 @@ public ClickHouseHttpResponse update(String query, Map<String, String> headers)
296304
return post(query, null, null, null, headers, null);
297305
}
298306

299-
public ClickHouseHttpResponse update(String query, InputStream data) throws IOException {
307+
public ClickHouseHttpResponse update(String query, ClickHouseInputStream data) throws IOException {
300308
return post(query, data, null, null, null, null);
301309
}
302310

303-
public ClickHouseHttpResponse update(String query, InputStream data, Map<String, String> headers)
311+
public ClickHouseHttpResponse update(String query, ClickHouseInputStream data, Map<String, String> headers)
304312
throws IOException {
305313
return post(query, data, null, null, headers, null);
306314
}

clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ private ClickHouseHttpResponse buildResponse() throws IOException {
7373
ClickHouseConfig c = config;
7474
ClickHouseFormat format = c.getFormat();
7575
TimeZone timeZone = c.getServerTimeZone();
76+
boolean hasOutputFile = output != null && output.getUnderlyingFile().isAvailable();
7677
boolean hasQueryResult = false;
7778
// queryId, format and timeZone are only available for queries
7879
if (!ClickHouseChecker.isNullOrEmpty(queryId)) {
@@ -102,8 +103,9 @@ private ClickHouseHttpResponse buildResponse() throws IOException {
102103
action = null;
103104
}
104105
return new ClickHouseHttpResponse(this,
105-
hasQueryResult ? ClickHouseClient.getAsyncResponseInputStream(c, source, action)
106-
: ClickHouseClient.getResponseInputStream(c, source, action),
106+
hasOutputFile ? ClickHouseInputStream.of(source, c.getReadBufferSize(), action)
107+
: (hasQueryResult ? ClickHouseClient.getAsyncResponseInputStream(c, source, action)
108+
: ClickHouseClient.getResponseInputStream(c, source, action)),
107109
displayName, queryId, summary, format, timeZone);
108110
}
109111

@@ -202,7 +204,7 @@ protected boolean isReusable() {
202204
}
203205

204206
@Override
205-
protected ClickHouseHttpResponse post(String sql, InputStream data, List<ClickHouseExternalTable> tables,
207+
protected ClickHouseHttpResponse post(String sql, ClickHouseInputStream data, List<ClickHouseExternalTable> tables,
206208
String url, Map<String, String> headers, ClickHouseConfig config) throws IOException {
207209
Charset charset = StandardCharsets.US_ASCII;
208210
byte[] boundary = null;
@@ -216,16 +218,19 @@ protected ClickHouseHttpResponse post(String sql, InputStream data, List<ClickHo
216218
setHeaders(conn, headers);
217219

218220
ClickHouseConfig c = config;
221+
final boolean hasFile = data != null && data.getUnderlyingFile().isAvailable();
219222
final boolean hasInput = data != null || boundary != null;
220223
if (hasInput) {
221224
conn.setChunkedStreamingMode(config.getRequestChunkSize());
222225
} else {
223226
// TODO conn.setFixedLengthStreamingMode(contentLength);
224227
}
225-
try (ClickHouseOutputStream out = hasInput
226-
? ClickHouseClient.getAsyncRequestOutputStream(config, conn.getOutputStream(), null) // latch::countDown)
227-
: ClickHouseClient.getRequestOutputStream(c, conn.getOutputStream(), null)) {
228-
byte[] sqlBytes = sql.getBytes(StandardCharsets.UTF_8);
228+
try (ClickHouseOutputStream out = hasFile
229+
? ClickHouseOutputStream.of(conn.getOutputStream(), config.getWriteBufferSize())
230+
: (hasInput
231+
? ClickHouseClient.getAsyncRequestOutputStream(config, conn.getOutputStream(), null) // latch::countDown)
232+
: ClickHouseClient.getRequestOutputStream(c, conn.getOutputStream(), null))) {
233+
byte[] sqlBytes = hasFile ? new byte[0] : sql.getBytes(StandardCharsets.UTF_8);
229234
if (boundary != null) {
230235
byte[] linePrefix = new byte[] { '\r', '\n', '-', '-' };
231236
byte[] lineSuffix = new byte[] { '\r', '\n' };
@@ -268,7 +273,7 @@ protected ClickHouseHttpResponse post(String sql, InputStream data, List<ClickHo
268273
out.writeBytes(sqlBytes);
269274
if (data != null && data.available() > 0) {
270275
// append \n
271-
if (sqlBytes[sqlBytes.length - 1] != (byte) '\n') {
276+
if (sqlBytes.length > 0 && sqlBytes[sqlBytes.length - 1] != (byte) '\n') {
272277
out.write(10);
273278
}
274279
ClickHouseInputStream.pipe(data, out, c.getWriteBufferSize());

clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.clickhouse.client.ClickHouseFormat;
88
import com.clickhouse.client.ClickHouseInputStream;
99
import com.clickhouse.client.ClickHouseNode;
10+
import com.clickhouse.client.ClickHouseOutputStream;
1011
import com.clickhouse.client.ClickHousePipedOutputStream;
1112
import com.clickhouse.client.ClickHouseRequest;
1213
import com.clickhouse.client.ClickHouseSslContextProvider;
@@ -98,6 +99,7 @@ private ClickHouseHttpResponse buildResponse(ClickHouseConfig config, HttpRespon
9899
: timeZone;
99100
}
100101

102+
boolean hasOutputFile = output != null && output.getUnderlyingFile().isAvailable();
101103
final InputStream source;
102104
final Runnable action;
103105
if (output != null) {
@@ -117,8 +119,9 @@ private ClickHouseHttpResponse buildResponse(ClickHouseConfig config, HttpRespon
117119
}
118120

119121
return new ClickHouseHttpResponse(this,
120-
ClickHouseInputStream.wrap(null, source, config.getReadBufferSize(), action,
121-
config.getResponseCompressAlgorithm(), config.getResponseCompressLevel()),
122+
hasOutputFile ? ClickHouseInputStream.of(source, config.getReadBufferSize(), action)
123+
: ClickHouseInputStream.wrap(null, source, config.getReadBufferSize(), action,
124+
config.getResponseCompressAlgorithm(), config.getResponseCompressLevel()),
122125
displayName, queryId, summary, format, timeZone);
123126
}
124127

@@ -194,7 +197,8 @@ private CompletableFuture<HttpResponse<InputStream>> postRequest(HttpRequest req
194197
}
195198

196199
private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.Builder reqBuilder, String boundary,
197-
String sql, InputStream data, List<ClickHouseExternalTable> tables) throws IOException {
200+
String sql, ClickHouseInputStream data, List<ClickHouseExternalTable> tables) throws IOException {
201+
final boolean hasFile = data != null && data.getUnderlyingFile().isAvailable();
198202
ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config,
199203
null);
200204
reqBuilder.POST(HttpRequest.BodyPublishers.ofInputStream(stream::getInputStream));
@@ -228,12 +232,14 @@ private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.B
228232
writer.write("\r\n--" + boundary + "--\r\n");
229233
writer.flush();
230234
} else {
231-
writer.write(sql);
232-
writer.flush();
235+
if (!hasFile) {
236+
writer.write(sql);
237+
writer.flush();
238+
}
233239

234240
if (data.available() > 0) {
235241
// append \n
236-
if (sql.charAt(sql.length() - 1) != '\n') {
242+
if (!hasFile && sql.charAt(sql.length() - 1) != '\n') {
237243
stream.write(10);
238244
}
239245

@@ -281,7 +287,7 @@ private ClickHouseHttpResponse postString(ClickHouseConfig config, HttpRequest.B
281287
}
282288

283289
@Override
284-
protected ClickHouseHttpResponse post(String sql, InputStream data, List<ClickHouseExternalTable> tables,
290+
protected ClickHouseHttpResponse post(String sql, ClickHouseInputStream data, List<ClickHouseExternalTable> tables,
285291
String url, Map<String, String> headers, ClickHouseConfig config) throws IOException {
286292
ClickHouseConfig c = config == null ? this.config : config;
287293
HttpRequest.Builder reqBuilder = HttpRequest.newBuilder()

0 commit comments

Comments
 (0)