Skip to content

support more compression algorithms #1174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions clickhouse-client/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -18,7 +20,7 @@
<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>org.roaringbitmap</artifactId>
<scope>provided</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>*</groupId>
Expand All @@ -27,50 +29,70 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>brotli4j</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>dnsjava</groupId>
<artifactId>dnsjava</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.brotli</groupId>
<artifactId>dec</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ static ClickHouseOutputStream getRequestOutputStream(ClickHouseConfig config, Ou
Runnable postCloseAction) {
if (config == null) {
return ClickHouseOutputStream.of(output, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(),
ClickHouseCompression.NONE, postCloseAction);
ClickHouseCompression.NONE, -1, postCloseAction);
}

return ClickHouseOutputStream.of(output, config.getWriteBufferSize(), config.getRequestCompressAlgorithm(),
postCloseAction);
config.getRequestCompressLevel(), postCloseAction);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ public enum ClickHouseCompression {
BZ2("application/x-bzip2", "bz2", "bz2"),
DEFLATE("application/deflate", "deflate", "zz"),
GZIP("application/gzip", "gzip", "gz"),
LZMA("application/x-lzma", "lzma", "xz"),
LZ4("application/x-lz4", "lz4", "lz4"),
ZIP("application/zip", "zip", "zip"),
SNAPPY("application/x-snappy", "snappy", "sz"),
XZ("application/x-xz", "xz", "xz"),
ZSTD("application/zstd", "zstd", "zst");

private String mimeType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,12 @@ public ClickHouseCompression getResponseCompressAlgorithm() {

/**
* Gets input compress level. When {@link #isResponseCompressed()} is
* {@code false}, this will return {@code 0}.
* {@code false}, this will return {@code -1}.
*
* @return compress level
*/
public int getResponseCompressLevel() {
return decompressResponse ? decompressLevel : 0;
return decompressResponse ? decompressLevel : -1;
}

/**
Expand All @@ -407,12 +407,12 @@ public ClickHouseCompression getRequestCompressAlgorithm() {

/**
* Gets input compress level. When {@link #isRequestCompressed()} is
* {@code false}, this will return {@code 0}.
* {@code false}, this will return {@code -1}.
*
* @return compress level
*/
public int getRequestCompressLevel() {
return compressRequest ? compressLevel : 0;
return compressRequest ? compressLevel : -1;
}

public int getConnectionTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public ClickHouseCompression getCompressionAlgorithm() {
/**
* Gets compression level.
*
* @return compression level, which is always greater than or equal to zero
* @return compression level, which in general should be greater than or equal
* to zero
*/
public int getCompressionLevel() {
return compressLevel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;
import java.util.zip.InflaterInputStream;

import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.stream.BlockingInputStream;
import com.clickhouse.client.stream.CompressionUtils;
import com.clickhouse.client.stream.DeferredInputStream;
import com.clickhouse.client.stream.EmptyInputStream;
import com.clickhouse.client.stream.Lz4InputStream;
Expand Down Expand Up @@ -71,18 +73,42 @@ public static ClickHouseInputStream wrap(ClickHouseFile file, InputStream input,
: new WrappedInputStream(file, input, bufferSize, postCloseAction);
} else {
switch (compression) {
case BROTLI:
chInput = new WrappedInputStream(file, CompressionUtils.createBrotliInputStream(input, bufferSize),
bufferSize, postCloseAction);
break;
case BZ2:
chInput = new WrappedInputStream(file, CompressionUtils.createBz2InputStream(input), bufferSize,
postCloseAction);
break;
case DEFLATE:
chInput = new WrappedInputStream(file, new InflaterInputStream(input), bufferSize, postCloseAction);
break;
case GZIP:
try {
chInput = new WrappedInputStream(file, new GZIPInputStream(input), bufferSize, postCloseAction);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to wrap input stream", e);
throw new IllegalArgumentException(CompressionUtils.ERROR_FAILED_TO_WRAP_INPUT, e);
}
break;
case LZ4:
chInput = new Lz4InputStream(file, input, postCloseAction);
break;
case SNAPPY:
// https://github.com/ClickHouse/ClickHouse/issues/44885
chInput = new WrappedInputStream(file, CompressionUtils.createSnappyInputStream(input),
bufferSize, postCloseAction);
break;
case ZSTD:
chInput = new WrappedInputStream(file, CompressionUtils.createZstdInputStream(input), bufferSize,
postCloseAction);
break;
case XZ:
chInput = new WrappedInputStream(file, CompressionUtils.createXzInputStream(input), bufferSize,
postCloseAction);
break;
default:
throw new UnsupportedOperationException("Unsupported compression algorithm: " + compression);
throw new UnsupportedOperationException("Unsupported decompression algorithm: " + compression);
}
}
return chInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPOutputStream;

import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.stream.CompressionUtils;
import com.clickhouse.client.stream.EmptyOutputStream;
import com.clickhouse.client.stream.Lz4OutputStream;
import com.clickhouse.client.stream.WrappedOutputStream;
Expand Down Expand Up @@ -42,18 +42,54 @@ static ClickHouseOutputStream wrap(ClickHouseFile file, OutputStream output, int
if (compression == null || compression == ClickHouseCompression.NONE) {
chOutput = new WrappedOutputStream(file, output, bufferSize, postCloseAction);
} else {
// never got brotli, bz2, deflate, gzip, and xz working :<
switch (compression) {
case GZIP:
try {
chOutput = new WrappedOutputStream(file, new GZIPOutputStream(output), bufferSize,
postCloseAction);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to wrap input stream", e);
}
break;
// case BROTLI:
// chOutput = new WrappedOutputStream(file,
// CompressionUtils.createBrotliOutputStream(output, compressionLevel,
// bufferSize), bufferSize, postCloseAction);
// break;
// case BZ2:
// chOutput = new WrappedOutputStream(file,
// CompressionUtils.createBz2OutputStream(output, compressionLevel), bufferSize,
// postCloseAction);
// break;
// case DEFLATE:
// chOutput = new WrappedOutputStream(file, new InflaterOutputStream(output),
// bufferSize,
// postCloseAction);
// break;
// case GZIP:
// try {
// GzipParameters params = new GzipParameters();
// params.setBufferSize(bufferSize);
// params.setCompressionLevel(3);
// chOutput = new WrappedOutputStream(file, new
// GzipCompressorOutputStream(output, params),
// bufferSize, postCloseAction);
// } catch (IOException e) {
// throw new
// IllegalArgumentException(CompressionUtils.ERROR_FAILED_TO_WRAP_OUTPUT, e);
// }
// break;
case LZ4:
chOutput = new Lz4OutputStream(file, output, bufferSize, postCloseAction);
chOutput = new Lz4OutputStream(file, output, compressionLevel, bufferSize, postCloseAction);
break;
case SNAPPY:
chOutput = new WrappedOutputStream(file,
CompressionUtils.createSnappyOutputStream(output, compressionLevel), bufferSize,
postCloseAction);
break;
case ZSTD:
chOutput = new WrappedOutputStream(file,
CompressionUtils.createZstdOutputStream(output, compressionLevel), bufferSize,
postCloseAction);
break;
// case XZ:
// chOutput = new WrappedOutputStream(file,
// CompressionUtils.createXzOutputStream(output, 6), bufferSize,
// postCloseAction);
// break;
default:
throw new UnsupportedOperationException("Unsupported compression algorithm: " + compression);
}
Expand Down Expand Up @@ -101,7 +137,7 @@ public static ClickHouseOutputStream of(ClickHouseFile file, int bufferSize, Run
* {@link ClickHouseOutputStream}
*/
public static ClickHouseOutputStream of(OutputStream output) {
return of(output, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), null, null);
return of(output, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), null, -1, null);
}

/**
Expand All @@ -114,7 +150,7 @@ public static ClickHouseOutputStream of(OutputStream output) {
* {@link ClickHouseOutputStream}
*/
public static ClickHouseOutputStream of(OutputStream output, int bufferSize) {
return of(output, bufferSize, null, null);
return of(output, bufferSize, null, -1, null);
}

/**
Expand All @@ -126,13 +162,14 @@ public static ClickHouseOutputStream of(OutputStream output, int bufferSize) {
* @param compression compression algorithm, null or
* {@link ClickHouseCompression#NONE} means no
* compression
* @param level compression level
* @param postCloseAction custom action will be performed right after closing
* the output stream
* @return wrapped output, or the same output if it's instance of
* {@link ClickHouseOutputStream}
*/
public static ClickHouseOutputStream of(OutputStream output, int bufferSize, ClickHouseCompression compression,
Runnable postCloseAction) {
int level, Runnable postCloseAction) {
final ClickHouseOutputStream chOutput;
if (output == null) {
chOutput = EmptyOutputStream.INSTANCE;
Expand All @@ -141,7 +178,7 @@ public static ClickHouseOutputStream of(OutputStream output, int bufferSize, Cli
? (ClickHouseOutputStream) output
: new WrappedOutputStream(null, output, bufferSize, postCloseAction);
} else {
chOutput = wrap(null, output, bufferSize, postCloseAction, compression, 0);
chOutput = wrap(null, output, bufferSize, postCloseAction, compression, level);
}
return chOutput;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,12 +819,6 @@ public SelfT compressServerResponse(boolean enable, ClickHouseCompression compre
: ClickHouseCompression.NONE;
}

if (compressLevel < 0) {
compressLevel = 0;
} else if (compressLevel > 9) {
compressLevel = 9;
}

return option(ClickHouseClientOption.COMPRESS, enable)
.option(ClickHouseClientOption.COMPRESS_ALGORITHM, compressAlgorithm)
.option(ClickHouseClientOption.COMPRESS_LEVEL, compressLevel);
Expand Down Expand Up @@ -894,12 +888,6 @@ public SelfT decompressClientRequest(boolean enable, ClickHouseCompression compr
: ClickHouseCompression.NONE;
}

if (compressLevel < 0) {
compressLevel = 0;
} else if (compressLevel > 9) {
compressLevel = 9;
}

return option(ClickHouseClientOption.DECOMPRESS, enable)
.option(ClickHouseClientOption.DECOMPRESS_ALGORITHM, compressAlgorithm)
.option(ClickHouseClientOption.DECOMPRESS_LEVEL, compressLevel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ public enum ClickHouseClientOption implements ClickHouseOption {
/**
* Compression level for compressing server response.
*/
COMPRESS_LEVEL("compress_level", 3, "Compression level for response, from 0 to 9(low to high)"),
COMPRESS_LEVEL("compress_level", -1, "Compression level for response, -1 standards for default"),
/**
* Compression level for decompress client request.
*/
DECOMPRESS_LEVEL("decompress_level", 3, "Compression level for request, from 0 to 9(low to high)"),
DECOMPRESS_LEVEL("decompress_level", -1, "Compression level for request, -1 standards for default"),

/**
* Connection timeout in milliseconds.
Expand Down
Loading