Skip to content

Commit 9d7b7ac

Browse files
committed
override readCustom to stop reusing same byte array for mutliple reads
1 parent 6d60540 commit 9d7b7ac

File tree

1 file changed

+43
-0
lines changed

1 file changed

+43
-0
lines changed

clickhouse-data/src/main/java/com/clickhouse/data/stream/Lz4InputStream.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
import java.io.InputStream;
55
import java.io.InvalidObjectException;
66
import java.io.StreamCorruptedException;
7+
import java.util.Arrays;
8+
import java.util.LinkedList;
79

810
import com.clickhouse.data.ClickHouseByteUtils;
911
import com.clickhouse.data.ClickHouseByteBuffer;
1012
import com.clickhouse.data.ClickHouseChecker;
1113
import com.clickhouse.data.ClickHouseCityHash;
14+
import com.clickhouse.data.ClickHouseDataUpdater;
1215
import com.clickhouse.data.ClickHouseInputStream;
1316
import com.clickhouse.data.ClickHousePassThruStream;
1417
import com.clickhouse.data.ClickHouseUtils;
@@ -105,6 +108,46 @@ public Lz4InputStream(ClickHousePassThruStream stream, InputStream input, Runnab
105108
this.compressedBlock = ClickHouseByteBuffer.EMPTY_BYTES;
106109
}
107110

111+
@Override
112+
public ClickHouseByteBuffer readCustom(ClickHouseDataUpdater reader) throws IOException {
113+
if (reader == null) {
114+
return byteBuffer.reset();
115+
}
116+
ensureOpen();
117+
118+
LinkedList<byte[]> list = new LinkedList<>();
119+
int length = 0;
120+
boolean more = true;
121+
while (more) {
122+
int remain = limit - position;
123+
if (remain < 1) {
124+
closeQuietly();
125+
more = false;
126+
} else {
127+
int read = reader.update(buffer, position, limit);
128+
if (read == -1) {
129+
list.add(Arrays.copyOfRange(buffer, position, limit));
130+
length += remain;
131+
position = limit;
132+
if (updateBuffer() < 1) {
133+
closeQuietly();
134+
more = false;
135+
}
136+
} else {
137+
if (read > 0) {
138+
byte[] bytes = new byte[read];
139+
System.arraycopy(buffer, position, bytes, 0, read);
140+
list.add(bytes);
141+
length += read;
142+
position += read;
143+
}
144+
more = false;
145+
}
146+
}
147+
}
148+
return byteBuffer.update(list, 0, length);
149+
}
150+
108151
@Override
109152
public void close() throws IOException {
110153
if (!closed) {

0 commit comments

Comments
 (0)