Skip to content

Commit 0317788

Browse files
authored
Merge pull request #1174 from zhicwu/develop
support more compression algorithms
2 parents 60b8ca8 + 573d06e commit 0317788

File tree

23 files changed

+732
-422
lines changed

23 files changed

+732
-422
lines changed

clickhouse-client/pom.xml

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
1+
<project xmlns="http://maven.apache.org/POM/4.0.0"
2+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
24
<modelVersion>4.0.0</modelVersion>
35

46
<parent>
@@ -18,7 +20,7 @@
1820
<dependency>
1921
<groupId>${project.parent.groupId}</groupId>
2022
<artifactId>org.roaringbitmap</artifactId>
21-
<scope>provided</scope>
23+
<optional>true</optional>
2224
<exclusions>
2325
<exclusion>
2426
<groupId>*</groupId>
@@ -27,50 +29,70 @@
2729
</exclusions>
2830
</dependency>
2931

32+
<dependency>
33+
<groupId>com.aayushatharva.brotli4j</groupId>
34+
<artifactId>brotli4j</artifactId>
35+
<optional>true</optional>
36+
</dependency>
3037
<dependency>
3138
<groupId>com.github.ben-manes.caffeine</groupId>
3239
<artifactId>caffeine</artifactId>
33-
<scope>provided</scope>
40+
<optional>true</optional>
41+
</dependency>
42+
<dependency>
43+
<groupId>com.github.luben</groupId>
44+
<artifactId>zstd-jni</artifactId>
45+
<optional>true</optional>
3446
</dependency>
3547
<dependency>
3648
<groupId>com.google.code.gson</groupId>
3749
<artifactId>gson</artifactId>
38-
<scope>provided</scope>
50+
<optional>true</optional>
3951
</dependency>
4052
<dependency>
4153
<groupId>dnsjava</groupId>
4254
<artifactId>dnsjava</artifactId>
43-
<scope>provided</scope>
55+
<optional>true</optional>
4456
</dependency>
4557
<dependency>
4658
<groupId>org.apache.avro</groupId>
4759
<artifactId>avro</artifactId>
48-
<scope>provided</scope>
60+
<optional>true</optional>
61+
</dependency>
62+
<dependency>
63+
<groupId>org.brotli</groupId>
64+
<artifactId>dec</artifactId>
65+
<optional>true</optional>
4966
</dependency>
5067
<dependency>
5168
<groupId>org.jctools</groupId>
5269
<artifactId>jctools-core</artifactId>
53-
<scope>provided</scope>
70+
<optional>true</optional>
5471
</dependency>
5572
<dependency>
5673
<groupId>org.lz4</groupId>
5774
<artifactId>lz4-java</artifactId>
58-
<scope>provided</scope>
75+
<optional>true</optional>
5976
</dependency>
6077
<dependency>
6178
<groupId>org.msgpack</groupId>
6279
<artifactId>msgpack-core</artifactId>
63-
<scope>provided</scope>
80+
<optional>true</optional>
6481
</dependency>
6582
<dependency>
6683
<groupId>org.slf4j</groupId>
6784
<artifactId>slf4j-api</artifactId>
68-
<scope>provided</scope>
85+
<optional>true</optional>
86+
</dependency>
87+
<dependency>
88+
<groupId>org.tukaani</groupId>
89+
<artifactId>xz</artifactId>
90+
<optional>true</optional>
6991
</dependency>
7092
<dependency>
7193
<groupId>org.xerial.snappy</groupId>
7294
<artifactId>snappy-java</artifactId>
73-
<scope>provided</scope>
95+
<optional>true</optional>
7496
</dependency>
7597

7698
<dependency>

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ static ClickHouseOutputStream getRequestOutputStream(ClickHouseConfig config, Ou
7575
Runnable postCloseAction) {
7676
if (config == null) {
7777
return ClickHouseOutputStream.of(output, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(),
78-
ClickHouseCompression.NONE, postCloseAction);
78+
ClickHouseCompression.NONE, -1, postCloseAction);
7979
}
8080

8181
return ClickHouseOutputStream.of(output, config.getWriteBufferSize(), config.getRequestCompressAlgorithm(),
82-
postCloseAction);
82+
config.getRequestCompressLevel(), postCloseAction);
8383
}
8484

8585
/**

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseCompression.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ public enum ClickHouseCompression {
1111
BZ2("application/x-bzip2", "bz2", "bz2"),
1212
DEFLATE("application/deflate", "deflate", "zz"),
1313
GZIP("application/gzip", "gzip", "gz"),
14-
LZMA("application/x-lzma", "lzma", "xz"),
1514
LZ4("application/x-lz4", "lz4", "lz4"),
16-
ZIP("application/zip", "zip", "zip"),
15+
SNAPPY("application/x-snappy", "snappy", "sz"),
16+
XZ("application/x-xz", "xz", "xz"),
1717
ZSTD("application/zstd", "zstd", "zst");
1818

1919
private String mimeType;

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseConfig.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -378,12 +378,12 @@ public ClickHouseCompression getResponseCompressAlgorithm() {
378378

379379
/**
380380
* Gets input compress level. When {@link #isResponseCompressed()} is
381-
* {@code false}, this will return {@code 0}.
381+
* {@code false}, this will return {@code -1}.
382382
*
383383
* @return compress level
384384
*/
385385
public int getResponseCompressLevel() {
386-
return decompressResponse ? decompressLevel : 0;
386+
return decompressResponse ? decompressLevel : -1;
387387
}
388388

389389
/**
@@ -407,12 +407,12 @@ public ClickHouseCompression getRequestCompressAlgorithm() {
407407

408408
/**
409409
* Gets input compress level. When {@link #isRequestCompressed()} is
410-
* {@code false}, this will return {@code 0}.
410+
* {@code false}, this will return {@code -1}.
411411
*
412412
* @return compress level
413413
*/
414414
public int getRequestCompressLevel() {
415-
return compressRequest ? compressLevel : 0;
415+
return compressRequest ? compressLevel : -1;
416416
}
417417

418418
public int getConnectionTimeout() {

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public ClickHouseCompression getCompressionAlgorithm() {
127127
/**
128128
* Gets compression level.
129129
*
130-
* @return compression level, which is always greater than or equal to zero
130+
* @return compression level, which in general should be greater than or equal
131+
* to zero
131132
*/
132133
public int getCompressionLevel() {
133134
return compressLevel;

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import java.util.concurrent.TimeoutException;
2525
import java.util.function.Function;
2626
import java.util.zip.GZIPInputStream;
27+
import java.util.zip.InflaterInputStream;
2728

2829
import com.clickhouse.client.config.ClickHouseClientOption;
2930
import com.clickhouse.client.stream.BlockingInputStream;
31+
import com.clickhouse.client.stream.CompressionUtils;
3032
import com.clickhouse.client.stream.DeferredInputStream;
3133
import com.clickhouse.client.stream.EmptyInputStream;
3234
import com.clickhouse.client.stream.Lz4InputStream;
@@ -71,18 +73,42 @@ public static ClickHouseInputStream wrap(ClickHouseFile file, InputStream input,
7173
: new WrappedInputStream(file, input, bufferSize, postCloseAction);
7274
} else {
7375
switch (compression) {
76+
case BROTLI:
77+
chInput = new WrappedInputStream(file, CompressionUtils.createBrotliInputStream(input, bufferSize),
78+
bufferSize, postCloseAction);
79+
break;
80+
case BZ2:
81+
chInput = new WrappedInputStream(file, CompressionUtils.createBz2InputStream(input), bufferSize,
82+
postCloseAction);
83+
break;
84+
case DEFLATE:
85+
chInput = new WrappedInputStream(file, new InflaterInputStream(input), bufferSize, postCloseAction);
86+
break;
7487
case GZIP:
7588
try {
7689
chInput = new WrappedInputStream(file, new GZIPInputStream(input), bufferSize, postCloseAction);
7790
} catch (IOException e) {
78-
throw new IllegalArgumentException("Failed to wrap input stream", e);
91+
throw new IllegalArgumentException(CompressionUtils.ERROR_FAILED_TO_WRAP_INPUT, e);
7992
}
8093
break;
8194
case LZ4:
8295
chInput = new Lz4InputStream(file, input, postCloseAction);
8396
break;
97+
case SNAPPY:
98+
// https://github.com/ClickHouse/ClickHouse/issues/44885
99+
chInput = new WrappedInputStream(file, CompressionUtils.createSnappyInputStream(input),
100+
bufferSize, postCloseAction);
101+
break;
102+
case ZSTD:
103+
chInput = new WrappedInputStream(file, CompressionUtils.createZstdInputStream(input), bufferSize,
104+
postCloseAction);
105+
break;
106+
case XZ:
107+
chInput = new WrappedInputStream(file, CompressionUtils.createXzInputStream(input), bufferSize,
108+
postCloseAction);
109+
break;
84110
default:
85-
throw new UnsupportedOperationException("Unsupported compression algorithm: " + compression);
111+
throw new UnsupportedOperationException("Unsupported decompression algorithm: " + compression);
86112
}
87113
}
88114
return chInput;

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
import java.nio.ByteBuffer;
88
import java.nio.charset.Charset;
99
import java.nio.charset.StandardCharsets;
10-
import java.util.zip.GZIPOutputStream;
1110

1211
import com.clickhouse.client.config.ClickHouseClientOption;
12+
import com.clickhouse.client.stream.CompressionUtils;
1313
import com.clickhouse.client.stream.EmptyOutputStream;
1414
import com.clickhouse.client.stream.Lz4OutputStream;
1515
import com.clickhouse.client.stream.WrappedOutputStream;
@@ -42,18 +42,54 @@ static ClickHouseOutputStream wrap(ClickHouseFile file, OutputStream output, int
4242
if (compression == null || compression == ClickHouseCompression.NONE) {
4343
chOutput = new WrappedOutputStream(file, output, bufferSize, postCloseAction);
4444
} else {
45+
// never got brotli, bz2, deflate, gzip, and xz working :<
4546
switch (compression) {
46-
case GZIP:
47-
try {
48-
chOutput = new WrappedOutputStream(file, new GZIPOutputStream(output), bufferSize,
49-
postCloseAction);
50-
} catch (IOException e) {
51-
throw new IllegalArgumentException("Failed to wrap input stream", e);
52-
}
53-
break;
47+
// case BROTLI:
48+
// chOutput = new WrappedOutputStream(file,
49+
// CompressionUtils.createBrotliOutputStream(output, compressionLevel,
50+
// bufferSize), bufferSize, postCloseAction);
51+
// break;
52+
// case BZ2:
53+
// chOutput = new WrappedOutputStream(file,
54+
// CompressionUtils.createBz2OutputStream(output, compressionLevel), bufferSize,
55+
// postCloseAction);
56+
// break;
57+
// case DEFLATE:
58+
// chOutput = new WrappedOutputStream(file, new InflaterOutputStream(output),
59+
// bufferSize,
60+
// postCloseAction);
61+
// break;
62+
// case GZIP:
63+
// try {
64+
// GzipParameters params = new GzipParameters();
65+
// params.setBufferSize(bufferSize);
66+
// params.setCompressionLevel(3);
67+
// chOutput = new WrappedOutputStream(file, new
68+
// GzipCompressorOutputStream(output, params),
69+
// bufferSize, postCloseAction);
70+
// } catch (IOException e) {
71+
// throw new
72+
// IllegalArgumentException(CompressionUtils.ERROR_FAILED_TO_WRAP_OUTPUT, e);
73+
// }
74+
// break;
5475
case LZ4:
55-
chOutput = new Lz4OutputStream(file, output, bufferSize, postCloseAction);
76+
chOutput = new Lz4OutputStream(file, output, compressionLevel, bufferSize, postCloseAction);
77+
break;
78+
case SNAPPY:
79+
chOutput = new WrappedOutputStream(file,
80+
CompressionUtils.createSnappyOutputStream(output, compressionLevel), bufferSize,
81+
postCloseAction);
82+
break;
83+
case ZSTD:
84+
chOutput = new WrappedOutputStream(file,
85+
CompressionUtils.createZstdOutputStream(output, compressionLevel), bufferSize,
86+
postCloseAction);
5687
break;
88+
// case XZ:
89+
// chOutput = new WrappedOutputStream(file,
90+
// CompressionUtils.createXzOutputStream(output, 6), bufferSize,
91+
// postCloseAction);
92+
// break;
5793
default:
5894
throw new UnsupportedOperationException("Unsupported compression algorithm: " + compression);
5995
}
@@ -101,7 +137,7 @@ public static ClickHouseOutputStream of(ClickHouseFile file, int bufferSize, Run
101137
* {@link ClickHouseOutputStream}
102138
*/
103139
public static ClickHouseOutputStream of(OutputStream output) {
104-
return of(output, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), null, null);
140+
return of(output, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), null, -1, null);
105141
}
106142

107143
/**
@@ -114,7 +150,7 @@ public static ClickHouseOutputStream of(OutputStream output) {
114150
* {@link ClickHouseOutputStream}
115151
*/
116152
public static ClickHouseOutputStream of(OutputStream output, int bufferSize) {
117-
return of(output, bufferSize, null, null);
153+
return of(output, bufferSize, null, -1, null);
118154
}
119155

120156
/**
@@ -126,13 +162,14 @@ public static ClickHouseOutputStream of(OutputStream output, int bufferSize) {
126162
* @param compression compression algorithm, null or
127163
* {@link ClickHouseCompression#NONE} means no
128164
* compression
165+
* @param level compression level
129166
* @param postCloseAction custom action will be performed right after closing
130167
* the output stream
131168
* @return wrapped output, or the same output if it's instance of
132169
* {@link ClickHouseOutputStream}
133170
*/
134171
public static ClickHouseOutputStream of(OutputStream output, int bufferSize, ClickHouseCompression compression,
135-
Runnable postCloseAction) {
172+
int level, Runnable postCloseAction) {
136173
final ClickHouseOutputStream chOutput;
137174
if (output == null) {
138175
chOutput = EmptyOutputStream.INSTANCE;
@@ -141,7 +178,7 @@ public static ClickHouseOutputStream of(OutputStream output, int bufferSize, Cli
141178
? (ClickHouseOutputStream) output
142179
: new WrappedOutputStream(null, output, bufferSize, postCloseAction);
143180
} else {
144-
chOutput = wrap(null, output, bufferSize, postCloseAction, compression, 0);
181+
chOutput = wrap(null, output, bufferSize, postCloseAction, compression, level);
145182
}
146183
return chOutput;
147184
}

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -819,12 +819,6 @@ public SelfT compressServerResponse(boolean enable, ClickHouseCompression compre
819819
: ClickHouseCompression.NONE;
820820
}
821821

822-
if (compressLevel < 0) {
823-
compressLevel = 0;
824-
} else if (compressLevel > 9) {
825-
compressLevel = 9;
826-
}
827-
828822
return option(ClickHouseClientOption.COMPRESS, enable)
829823
.option(ClickHouseClientOption.COMPRESS_ALGORITHM, compressAlgorithm)
830824
.option(ClickHouseClientOption.COMPRESS_LEVEL, compressLevel);
@@ -894,12 +888,6 @@ public SelfT decompressClientRequest(boolean enable, ClickHouseCompression compr
894888
: ClickHouseCompression.NONE;
895889
}
896890

897-
if (compressLevel < 0) {
898-
compressLevel = 0;
899-
} else if (compressLevel > 9) {
900-
compressLevel = 9;
901-
}
902-
903891
return option(ClickHouseClientOption.DECOMPRESS, enable)
904892
.option(ClickHouseClientOption.DECOMPRESS_ALGORITHM, compressAlgorithm)
905893
.option(ClickHouseClientOption.DECOMPRESS_LEVEL, compressLevel);

clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,11 @@ public enum ClickHouseClientOption implements ClickHouseOption {
140140
/**
141141
* Compression level for compressing server response.
142142
*/
143-
COMPRESS_LEVEL("compress_level", 3, "Compression level for response, from 0 to 9(low to high)"),
143+
COMPRESS_LEVEL("compress_level", -1, "Compression level for response, -1 standards for default"),
144144
/**
145145
* Compression level for decompress client request.
146146
*/
147-
DECOMPRESS_LEVEL("decompress_level", 3, "Compression level for request, from 0 to 9(low to high)"),
147+
DECOMPRESS_LEVEL("decompress_level", -1, "Compression level for request, -1 standards for default"),
148148

149149
/**
150150
* Connection timeout in milliseconds.

0 commit comments

Comments
 (0)