Skip to content

Commit c63a34d

Browse files
mzitnikmzitnik
andauthored
Fix decompress bug in NonBlockingPipedOutputStream (#1542)
* Fix decompress bug in NonBlockingPipedOutputStream & extra logging * Use StringUtils to make large content * Move test to http client test --------- Co-authored-by: mzitnik <[email protected]>
1 parent bbd6808 commit c63a34d

File tree

6 files changed

+87
-1
lines changed

6 files changed

+87
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## 0.6.1
22

33
### Bug Fixes
4+
- Fix buffering issue caused by decompress flag not to work when working with HTTP Client.
45

56
## 0.6.0
67

clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.clickhouse.data.value.UnsignedShort;
4545

4646
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
47+
import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
4748
import org.testng.Assert;
4849
import org.testng.SkipException;
4950
import org.testng.annotations.DataProvider;

clickhouse-data/src/main/java/com/clickhouse/data/stream/AbstractByteArrayOutputStream.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
import com.clickhouse.data.ClickHouseDataUpdater;
77
import com.clickhouse.data.ClickHousePassThruStream;
88
import com.clickhouse.data.ClickHouseOutputStream;
9+
import com.clickhouse.logging.Logger;
10+
import com.clickhouse.logging.LoggerFactory;
911

1012
public abstract class AbstractByteArrayOutputStream extends ClickHouseOutputStream {
13+
private static final Logger log = LoggerFactory.getLogger(AbstractByteArrayOutputStream.class);
1114
protected final byte[] buffer;
1215

1316
protected int position;
@@ -72,6 +75,7 @@ public ClickHouseOutputStream writeBuffer(ClickHouseByteBuffer buffer) throws IO
7275
byte[] b = this.buffer;
7376
int limit = b.length;
7477
int length = buffer.length();
78+
log.debug("writeBuffer limit:[{}] length:[{}] position:[{}]", limit, length, position);
7579
if (length <= limit - position) {
7680
System.arraycopy(buffer.array(), buffer.position(), b, position, length);
7781
position += length;

clickhouse-data/src/main/java/com/clickhouse/data/stream/Lz4OutputStream.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,26 @@
99
import com.clickhouse.data.ClickHouseOutputStream;
1010
import com.clickhouse.data.ClickHousePassThruStream;
1111

12+
import com.clickhouse.logging.Logger;
13+
import com.clickhouse.logging.LoggerFactory;
1214
import net.jpountz.lz4.LZ4Compressor;
1315
import net.jpountz.lz4.LZ4Factory;
1416

1517
public class Lz4OutputStream extends AbstractByteArrayOutputStream {
1618
private static final LZ4Factory factory = LZ4Factory.fastestInstance();
17-
19+
private static final Logger log = LoggerFactory.getLogger(Lz4OutputStream.class);
1820
private final OutputStream output;
1921

2022
private final LZ4Compressor compressor;
2123
private final byte[] compressedBlock;
2224

2325
@Override
2426
protected void flushBuffer() throws IOException {
27+
log.debug("flushBuffer [{}:{}]", 0, position);
28+
if (position == 0) {
29+
log.debug("flushBuffer: nothing to flush");
30+
return;
31+
}
2532
byte[] block = compressedBlock;
2633
block[16] = Lz4InputStream.MAGIC;
2734
int compressed = compressor.compress(buffer, 0, position, block, 25);
@@ -37,6 +44,7 @@ protected void flushBuffer() throws IOException {
3744

3845
@Override
3946
protected void flushBuffer(byte[] bytes, int offset, int length) throws IOException {
47+
log.debug("flushBuffer [{}:{}]", offset, length);
4048
int maxLen = compressor.maxCompressedLength(length) + 15;
4149
byte[] block = maxLen <= compressedBlock.length ? compressedBlock : new byte[maxLen];
4250
block[16] = Lz4InputStream.MAGIC;

clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingPipedOutputStream.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import com.clickhouse.data.ClickHousePipedOutputStream;
1616
import com.clickhouse.data.ClickHouseUtils;
1717
import com.clickhouse.data.ClickHouseWriter;
18+
import com.clickhouse.logging.Logger;
19+
import com.clickhouse.logging.LoggerFactory;
1820

1921
/**
2022
* A combination of {@link java.io.PipedOutputStream} and
@@ -23,6 +25,9 @@
2325
* reader are on two separate threads.
2426
*/
2527
public class NonBlockingPipedOutputStream extends ClickHousePipedOutputStream {
28+
29+
private static final Logger log = LoggerFactory.getLogger(NonBlockingPipedOutputStream.class);
30+
2631
protected final AdaptiveQueue<ByteBuffer> queue;
2732

2833
protected final int bufferSize;
@@ -176,6 +181,7 @@ public ClickHouseOutputStream writeBytes(byte[] bytes, int offset, int length) t
176181
ByteBuffer b = buffer;
177182
while (length > 0) {
178183
int remain = b.remaining();
184+
log.debug("writeBytes length:[%d] remain:[%d] offset: [%d]", length, remain, offset);
179185
if (length < remain) {
180186
b.put(bytes, offset, length);
181187
length = 0;

clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import java.io.IOException;
44
import java.io.Serializable;
55
import java.util.*;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.stream.LongStream;
69

710
import com.clickhouse.client.ClickHouseClient;
811
import com.clickhouse.client.ClickHouseConfig;
@@ -27,16 +30,20 @@
2730
import com.clickhouse.client.http.config.HttpConnectionProvider;
2831
import com.clickhouse.config.ClickHouseOption;
2932
import com.clickhouse.data.ClickHouseCompression;
33+
import com.clickhouse.data.ClickHouseDataStreamFactory;
3034
import com.clickhouse.data.ClickHouseExternalTable;
3135
import com.clickhouse.data.ClickHouseFormat;
3236
import com.clickhouse.data.ClickHouseInputStream;
37+
import com.clickhouse.data.ClickHousePipedOutputStream;
3338
import com.clickhouse.data.ClickHouseRecord;
3439
import com.clickhouse.data.ClickHouseVersion;
40+
import com.clickhouse.data.format.BinaryStreamUtils;
3541
import com.clickhouse.data.value.ClickHouseStringValue;
3642

3743
import eu.rekawek.toxiproxy.ToxiproxyClient;
3844

3945
import org.testcontainers.containers.ToxiproxyContainer;
46+
import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
4047
import org.testng.Assert;
4148
import org.testng.annotations.DataProvider;
4249
import org.testng.annotations.Test;
@@ -469,4 +476,63 @@ public void testProxyConnection() throws ClickHouseException, IOException {
469476
}
470477
}
471478
}
479+
@Test(groups = "integration")
480+
public void testDecompressWithLargeChunk() throws ClickHouseException, IOException, ExecutionException, InterruptedException {
481+
ClickHouseNode server = getServer();
482+
483+
String tableName = "test_decompress_with_large_chunk";
484+
485+
String tableColumns = String.format("id Int64, raw String");
486+
sendAndWait(server, "drop table if exists " + tableName,
487+
"create table " + tableName + " (" + tableColumns + ")engine=Memory");
488+
489+
long numRows = 1;
490+
String content = StringUtils.repeat("*", 50000);
491+
try {
492+
try (ClickHouseClient client = getClient()) {
493+
ClickHouseRequest.Mutation request = client.read(server)
494+
.write()
495+
.table(tableName)
496+
.decompressClientRequest(true)
497+
//.option(ClickHouseClientOption.USE_BLOCKING_QUEUE, "true")
498+
.format(ClickHouseFormat.RowBinary);
499+
ClickHouseConfig config = request.getConfig();
500+
CompletableFuture<ClickHouseResponse> future;
501+
502+
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
503+
.createPipedOutputStream(config)) {
504+
// start the worker thread which transfer data from the input into ClickHouse
505+
future = request.data(stream.getInputStream()).execute();
506+
// write bytes into the piped stream
507+
LongStream.range(0, numRows).forEachOrdered(
508+
n -> {
509+
try {
510+
BinaryStreamUtils.writeInt64(stream, n);
511+
BinaryStreamUtils.writeString(stream, content);
512+
} catch (IOException e) {
513+
throw new RuntimeException(e);
514+
}
515+
}
516+
);
517+
518+
// We need to close the stream before getting a response
519+
stream.close();
520+
try (ClickHouseResponse response = future.get()) {
521+
ClickHouseResponseSummary summary = response.getSummary();
522+
Assert.assertEquals(summary.getWrittenRows(), numRows, "Num of written rows");
523+
}
524+
}
525+
526+
}
527+
} catch (Exception e) {
528+
Throwable th = e.getCause();
529+
// if (th instanceof ClickHouseException) {
530+
// ClickHouseException ce = (ClickHouseException) th;
531+
// Assert.assertEquals(73, ce.getErrorCode(), "It's Code: 73. DB::Exception: Unknown format RowBinaryWithDefaults. a server that not support the format");
532+
// } else {
533+
Assert.assertTrue(false, e.getMessage());
534+
// }
535+
}
536+
537+
}
472538
}

0 commit comments

Comments
 (0)