Skip to content

Commit a9b4dab

Browse files
authored
Merge pull request #969 from zhicwu/lz4
minor changes to enhance lz4 support and cli-client
2 parents 90e80ff + 6f82de2 commit a9b4dab

File tree

5 files changed

+36
-21
lines changed

5 files changed

+36
-21
lines changed

clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ static Process startProcess(ClickHouseNode server, ClickHouseRequest<?> request)
144144
ClickHouseChecker.isNullOrBlank(hostDir) ? System.getProperty("java.io.tmpdir") : hostDir);
145145
String containerDir = (String) config.getOption(ClickHouseCommandLineOption.CLI_CONTAINER_DIRECTORY);
146146
if (ClickHouseChecker.isNullOrBlank(containerDir)) {
147-
containerDir = "/data/";
147+
containerDir = "/tmp/";
148148
} else {
149149
containerDir = ClickHouseUtils.normalizeDirectory(containerDir);
150150
}

clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/config/ClickHouseCommandLineOption.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ public enum ClickHouseCommandLineOption implements ClickHouseOption {
3737
/**
3838
* Work directory inside container, only works running in docker mode(when
3939
* {@link #CLICKHOUSE_CLI_PATH} is not available). Empty value is treated as
40-
* '/data'.
40+
* '/tmp'.
4141
*/
4242
CLI_CONTAINER_DIRECTORY("cli_container_directory", "",
43-
"Work directory inside container, empty value is treated as '/data'"),
43+
"Work directory inside container, empty value is treated as '/tmp'"),
4444
/**
4545
* Command-line work directory. Empty value is treated as system temporary
4646
* directory(e.g. {@code System.getProperty("java.io.tmpdir")}). When running in

clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4InputStream.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public class Lz4InputStream extends AbstractByteArrayInputStream {
2626
private final InputStream stream;
2727
private final byte[] header;
2828

29+
private byte[] compressedBlock;
30+
2931
private boolean readFully(byte[] b, int off, int len) throws IOException {
3032
int n = 0;
3133
while (n < len) {
@@ -61,7 +63,8 @@ protected int updateBuffer() throws IOException {
6163
// 4 bytes - size of uncompressed data
6264
int uncompressedSize = BinaryStreamUtils.toInt32(header, 21);
6365
int offset = 9;
64-
byte[] block = new byte[compressedSizeWithHeader];
66+
final byte[] block = compressedBlock.length >= compressedSizeWithHeader ? compressedBlock
67+
: (compressedBlock = new byte[compressedSizeWithHeader]);
6568
block[0] = header[16];
6669
BinaryStreamUtils.setInt32(block, 1, compressedSizeWithHeader);
6770
BinaryStreamUtils.setInt32(block, 5, uncompressedSize);
@@ -70,17 +73,17 @@ protected int updateBuffer() throws IOException {
7073
throw new IOException(ClickHouseUtils.format(ERROR_INCOMPLETE_READ, 0, compressedSizeWithHeader - offset));
7174
}
7275

73-
long[] real = ClickHouseCityHash.cityHash128(block, 0, block.length);
76+
long[] real = ClickHouseCityHash.cityHash128(block, 0, compressedSizeWithHeader);
7477
if (real[0] != BinaryStreamUtils.toInt64(header, 0) || real[1] != BinaryStreamUtils.toInt64(header, 8)) {
7578
throw new IOException("Checksum doesn't match: corrupted data.");
7679
}
7780

78-
buffer = new byte[uncompressedSize];
79-
decompressor.decompress(block, offset, buffer, 0, uncompressedSize);
81+
final byte[] buf = buffer.length >= uncompressedSize ? buffer : (buffer = new byte[uncompressedSize]);
82+
decompressor.decompress(block, offset, buf, 0, uncompressedSize);
8083
if (copyTo != null) {
81-
copyTo.write(buffer);
84+
copyTo.write(buf);
8285
}
83-
return limit = buffer.length;
86+
return limit = uncompressedSize;
8487
}
8588

8689
public Lz4InputStream(InputStream stream) {
@@ -93,6 +96,8 @@ public Lz4InputStream(ClickHouseFile file, InputStream stream, Runnable postClos
9396
this.decompressor = factory.fastDecompressor();
9497
this.stream = ClickHouseChecker.nonNull(stream, "InputStream");
9598
this.header = new byte[HEADER_LENGTH];
99+
100+
this.compressedBlock = ClickHouseByteBuffer.EMPTY_BYTES;
96101
}
97102

98103
@Override

clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4OutputStream.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,23 @@ public class Lz4OutputStream extends AbstractByteArrayOutputStream {
2121

2222
@Override
2323
protected void flushBuffer() throws IOException {
24-
int compressed = compressor.compress(buffer, 0, position, compressedBlock, 25);
24+
byte[] block = compressedBlock;
25+
block[16] = Lz4InputStream.MAGIC;
26+
int compressed = compressor.compress(buffer, 0, position, block, 25);
2527
int compressedSizeWithHeader = compressed + 9;
26-
BinaryStreamUtils.setInt32(compressedBlock, 17, compressedSizeWithHeader); // compressed size with header
27-
BinaryStreamUtils.setInt32(compressedBlock, 21, position); // uncompressed size
28-
long[] hash = ClickHouseCityHash.cityHash128(compressedBlock, 16, compressedSizeWithHeader);
29-
BinaryStreamUtils.setInt64(compressedBlock, 0, hash[0]);
30-
BinaryStreamUtils.setInt64(compressedBlock, 8, hash[1]);
31-
output.write(compressedBlock, 0, compressed + 25);
28+
BinaryStreamUtils.setInt32(block, 17, compressedSizeWithHeader); // compressed size with header
29+
BinaryStreamUtils.setInt32(block, 21, position); // uncompressed size
30+
long[] hash = ClickHouseCityHash.cityHash128(block, 16, compressedSizeWithHeader);
31+
BinaryStreamUtils.setInt64(block, 0, hash[0]);
32+
BinaryStreamUtils.setInt64(block, 8, hash[1]);
33+
output.write(block, 0, compressed + 25);
3234
position = 0;
3335
}
3436

3537
@Override
3638
protected void flushBuffer(byte[] bytes, int offset, int length) throws IOException {
3739
int maxLen = compressor.maxCompressedLength(length) + 15;
38-
byte[] block = maxLen < compressedBlock.length ? compressedBlock : new byte[maxLen];
40+
byte[] block = maxLen <= compressedBlock.length ? compressedBlock : new byte[maxLen];
3941
block[16] = Lz4InputStream.MAGIC;
4042

4143
int compressed = compressor.compress(bytes, offset, length, block, 25);
@@ -61,7 +63,6 @@ public Lz4OutputStream(ClickHouseFile file, OutputStream stream, int maxCompress
6163
compressor = factory.fastCompressor();
6264
// reserve the first 9 bytes for calculating checksum
6365
compressedBlock = new byte[compressor.maxCompressedLength(maxCompressBlockSize) + 15];
64-
compressedBlock[16] = Lz4InputStream.MAGIC;
6566
}
6667

6768
@Override

clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -205,18 +205,28 @@ public void testOpenCloseClient() throws Exception {
205205

206206
@Test(dataProvider = "compressionMatrix", groups = { "integration" })
207207
public void testCompression(ClickHouseFormat format, ClickHouseBufferingMode bufferingMode,
208-
boolean compressRequest, boolean compressResponse) throws ClickHouseException {
208+
boolean compressRequest, boolean compressResponse) throws Exception {
209209
ClickHouseNode server = getServer();
210210
String uuid = UUID.randomUUID().toString();
211+
ClickHouseClient.send(server, "create table if not exists test_compress_decompress(id UUID)engine=Memory")
212+
.get();
211213
try (ClickHouseClient client = getClient()) {
212214
ClickHouseRequest<?> request = client.connect(server)
213215
.format(format)
214216
.option(ClickHouseClientOption.RESPONSE_BUFFERING, bufferingMode)
215217
.compressServerResponse(compressResponse)
216218
.decompressClientRequest(compressRequest);
219+
// start with insert
220+
try (ClickHouseResponse resp = request
221+
.query("insert into test_compress_decompress values(:uuid)").params(ClickHouseStringValue.of(uuid))
222+
.executeAndWait()) {
223+
Assert.assertNotNull(resp);
224+
}
225+
217226
boolean hasResult = false;
218227
try (ClickHouseResponse resp = request
219-
.query("select :uuid").params(ClickHouseStringValue.of(uuid)).executeAndWait()) {
228+
.query("select id from test_compress_decompress where id = :uuid")
229+
.params(ClickHouseStringValue.of(uuid)).executeAndWait()) {
220230
Assert.assertEquals(resp.firstRecord().getValue(0).asString(), uuid);
221231
hasResult = true;
222232
}
@@ -966,7 +976,6 @@ public void testCustomWriter() throws Exception {
966976

967977
@Test(groups = { "integration" })
968978
public void testDumpAndLoadFile() throws Exception {
969-
// super.testLoadRawData();
970979
ClickHouseNode server = getServer();
971980
ClickHouseClient.send(server, "drop table if exists test_dump_load_file",
972981
"create table test_dump_load_file(a UInt64, b Nullable(String)) engine=MergeTree() order by tuple()")

0 commit comments

Comments
 (0)