Skip to content

Commit a783804

Browse files
authored
Merge pull request #1238 from zhicwu/main
Enhance input/output stream and fix grpc test failure
2 parents 965a97a + 017cd97 commit a783804

31 files changed

+691
-590
lines changed

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseNodeSelector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public boolean match(ClickHouseNode node) {
192192
* Checks if the given protocol matches any of the preferred protocols.
193193
*
194194
* @param protocol protocol to check
195-
* @return true if the protocl matches at least one of preferred protocols;
195+
* @return true if the protocol matches at least one of preferred protocols;
196196
* false otherwise
197197
*/
198198
public boolean matchAnyOfPreferredProtocols(ClickHouseProtocol protocol) {

clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ public void testInitialization() {
415415
for (ClickHouseClient client : new ClickHouseClient[] { client1, client2, client3, client4, client5 }) {
416416
Assert.assertEquals(client.getClass(), Agent.class);
417417
Assert.assertEquals(((Agent) client).getClient().getClass(), getClientClass());
418-
Assert.assertTrue(client.accept(getProtocol()), "The client should support protocl: " + getProtocol());
418+
Assert.assertTrue(client.accept(getProtocol()), "The client should support protocol: " + getProtocol());
419419
}
420420
}
421421
}
@@ -1541,6 +1541,8 @@ public void testDumpFile(boolean gzipCompressed, boolean useOneLiner)
15411541
}
15421542
}
15431543
Assert.assertNotNull(summary);
1544+
long fileSize = Files.size(file.toPath());
1545+
Assert.assertTrue(fileSize > 0L, "Expects an non-empty file being created");
15441546
try (InputStream in = gzipCompressed ? new GZIPInputStream(new FileInputStream(file))
15451547
: new FileInputStream(file); ByteArrayOutputStream out = new ByteArrayOutputStream()) {
15461548
ClickHouseInputStream.pipe(in, out, 512);

clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseByteBuffer.java

+71
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.Serializable;
44
import java.math.BigInteger;
5+
import java.nio.Buffer;
56
import java.nio.ByteBuffer;
67
import java.nio.charset.Charset;
78
import java.nio.charset.StandardCharsets;
@@ -36,6 +37,31 @@ public static ClickHouseByteBuffer newInstance() {
3637
return new ClickHouseByteBuffer(EMPTY_BYTES, 0, 0);
3738
}
3839

40+
/**
41+
* Wraps given byte buffer.
42+
*
43+
* @param buffer byte buffer
44+
* @return non-null wrapped byte buffer
45+
*/
46+
public static ClickHouseByteBuffer of(ByteBuffer buffer) {
47+
if (buffer == null || !buffer.hasRemaining()) {
48+
return newInstance();
49+
}
50+
51+
int pos = buffer.position();
52+
int len = buffer.remaining();
53+
byte[] bytes;
54+
if (buffer.hasArray()) {
55+
bytes = buffer.array();
56+
} else {
57+
bytes = new byte[len];
58+
buffer.get(bytes);
59+
((Buffer) buffer).position(pos);
60+
pos = 0;
61+
}
62+
return new ClickHouseByteBuffer(bytes, pos, len);
63+
}
64+
3965
/**
4066
* Wraps given byte array as byte buffer.
4167
*
@@ -546,6 +572,31 @@ public ClickHouseByteBuffer slice(int offset, int length) {
546572
return ClickHouseByteBuffer.of(array, offset + position, length);
547573
}
548574

575+
/**
576+
* Updates buffer.
577+
*
578+
* @param buffer byte buffer
579+
* @return this byte buffer
580+
*/
581+
public ClickHouseByteBuffer update(ByteBuffer buffer) {
582+
if (buffer == null || !buffer.hasRemaining()) {
583+
return reset();
584+
}
585+
586+
int pos = buffer.position();
587+
int len = buffer.remaining();
588+
byte[] bytes;
589+
if (buffer.hasArray()) {
590+
bytes = buffer.array();
591+
} else {
592+
bytes = new byte[len];
593+
buffer.get(bytes);
594+
((Buffer) buffer).position(pos);
595+
pos = 0;
596+
}
597+
return update(bytes, pos, len);
598+
}
599+
549600
/**
550601
* Updates buffer.
551602
*
@@ -659,6 +710,26 @@ public byte[] array() {
659710
return array;
660711
}
661712

713+
/**
714+
* Creates a copy of the current byte buffer.
715+
*
716+
* @param deep true to copy the underlying byte array; false to reuse
717+
* @return non-null copy of the current byte buffer
718+
*/
719+
public ClickHouseByteBuffer copy(boolean deep) {
720+
byte[] bytes;
721+
int pos;
722+
if (deep) {
723+
bytes = new byte[length];
724+
pos = 0;
725+
System.arraycopy(array, position, bytes, 0, length);
726+
} else {
727+
bytes = array;
728+
pos = position;
729+
}
730+
return ClickHouseByteBuffer.of(bytes, pos, length);
731+
}
732+
662733
public byte firstByte() {
663734
return array[position];
664735
}

clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataProcessor.java

+15
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import java.io.EOFException;
44
import java.io.IOException;
55
import java.io.Serializable;
6+
import java.io.StreamCorruptedException;
67
import java.io.UncheckedIOException;
8+
import java.nio.charset.StandardCharsets;
79
import java.util.ArrayList;
810
import java.util.Arrays;
911
import java.util.Collections;
@@ -130,6 +132,19 @@ private ClickHouseRecord nextRecord() throws NoSuchElementException, UncheckedIO
130132
final ClickHouseRecord r = config.isReuseValueWrapper() ? currentRecord : currentRecord.copy();
131133
try {
132134
readAndFill(r);
135+
} catch (StreamCorruptedException e) {
136+
byte[] search = "ode: ".getBytes(StandardCharsets.US_ASCII);
137+
byte[] bytes = input.getBuffer().array();
138+
int index = ClickHouseUtils.indexOf(bytes, search);
139+
if (index > 0 && bytes[--index] == (byte) 'C') {
140+
throw new UncheckedIOException(new String(bytes, index, bytes.length - index, StandardCharsets.UTF_8),
141+
e);
142+
} else {
143+
throw new UncheckedIOException(
144+
ClickHouseUtils.format(ERROR_FAILED_TO_READ, readPosition + 1, columns.length,
145+
columns[readPosition]),
146+
e);
147+
}
133148
} catch (EOFException e) {
134149
if (readPosition == 0) { // end of the stream, which is fine
135150
throw new NoSuchElementException("No more record");

clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseInputStream.java

+76-11
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@
77
import java.io.IOException;
88
import java.io.InputStream;
99
import java.io.OutputStream;
10+
import java.io.StreamCorruptedException;
1011
import java.io.UncheckedIOException;
1112
import java.net.URL;
1213
import java.nio.ByteBuffer;
1314
import java.nio.charset.Charset;
1415
import java.nio.charset.StandardCharsets;
1516
import java.util.Arrays;
1617
import java.util.HashMap;
18+
import java.util.Iterator;
1719
import java.util.Map;
20+
import java.util.NoSuchElementException;
1821
import java.util.concurrent.BlockingQueue;
1922
import java.util.concurrent.CompletableFuture;
2023
import java.util.concurrent.ExecutionException;
@@ -24,7 +27,6 @@
2427

2528
import com.clickhouse.data.stream.BlockingInputStream;
2629
import com.clickhouse.data.stream.DeferredInputStream;
27-
import com.clickhouse.data.stream.DelegatedInputStream;
2830
import com.clickhouse.data.stream.EmptyInputStream;
2931
import com.clickhouse.data.stream.RestrictedInputStream;
3032
import com.clickhouse.data.stream.IterableByteArrayInputStream;
@@ -40,14 +42,45 @@
4042
* creation as well as closing the stream when it reaches end of stream. This
4143
* class is also responsible for creating various input stream as needed.
4244
*/
43-
public abstract class ClickHouseInputStream extends InputStream {
45+
public abstract class ClickHouseInputStream extends InputStream implements Iterable<ClickHouseByteBuffer> {
4446
protected static final String ERROR_INCOMPLETE_READ = "Reached end of input stream after reading %d of %d bytes";
4547
protected static final String ERROR_NULL_BYTES = "Non-null byte array is required";
4648
protected static final String ERROR_REUSE_BUFFER = "Please pass a different byte array instead of the same internal buffer for reading";
4749
protected static final String ERROR_STREAM_CLOSED = "Input stream has been closed";
4850

4951
public static final String TYPE_NAME = "InputStream";
5052

53+
static class ByteBufferIterator implements Iterator<ClickHouseByteBuffer> {
54+
private final ClickHouseInputStream input;
55+
56+
private ByteBufferIterator(ClickHouseInputStream input) {
57+
this.input = input;
58+
}
59+
60+
@Override
61+
public boolean hasNext() {
62+
try {
63+
return input.available() > 0;
64+
} catch (IOException e) {
65+
throw new UncheckedIOException(e);
66+
}
67+
}
68+
69+
@Override
70+
public ClickHouseByteBuffer next() {
71+
try {
72+
ClickHouseByteBuffer buffer = input.nextBuffer();
73+
if (buffer.isEmpty() && input.available() < 1) {
74+
throw new NoSuchElementException(
75+
"No more byte buffer for read as we reached end of the stream");
76+
}
77+
return buffer;
78+
} catch (IOException e) {
79+
throw new UncheckedIOException(e);
80+
}
81+
}
82+
}
83+
5184
/**
5285
* Wraps the given input stream.
5386
*
@@ -733,6 +766,22 @@ protected void ensureOpen() throws IOException {
733766
}
734767
}
735768

769+
/**
770+
* Gets reference to current byte buffer.
771+
*
772+
* @return non-null byte buffer
773+
*/
774+
protected abstract ClickHouseByteBuffer getBuffer();
775+
776+
/**
777+
* Gets reference to next byte buffer available for read. An empty byte buffer
778+
* will be returned ({@code nextBuffer().isEmpty() == true}), when it reaches
779+
* end of the input stream.
780+
*
781+
* @return non-null byte buffer
782+
*/
783+
protected abstract ClickHouseByteBuffer nextBuffer() throws IOException;
784+
736785
/**
737786
* Gets underlying file. Same as
738787
* {@code ClickHouseFile.of(getUnderlyingStream())}.
@@ -827,7 +876,21 @@ public final <T> T setUserData(String key, T value) {
827876
* @throws IOException when failed to read value from input stream or reached
828877
* end of the stream
829878
*/
830-
public abstract long pipe(ClickHouseOutputStream output) throws IOException;
879+
public long pipe(ClickHouseOutputStream output) throws IOException {
880+
long count = 0L;
881+
if (output == null || output.isClosed()) {
882+
return count;
883+
}
884+
ensureOpen();
885+
886+
try (ClickHouseInputStream in = this) {
887+
for (ClickHouseByteBuffer buf : in) {
888+
count += buf.length();
889+
output.writeBuffer(buf);
890+
}
891+
}
892+
return count;
893+
}
831894

832895
/**
833896
* Reads an unsigned byte from the input stream. Unlike {@link #read()}, it will
@@ -910,7 +973,7 @@ public byte[] readBytes(int length) throws IOException {
910973
if (read == -1) {
911974
closeQuietly();
912975
throw offset == 0 ? new EOFException()
913-
: new IOException(ClickHouseUtils.format(ERROR_INCOMPLETE_READ, offset, length));
976+
: new StreamCorruptedException(ClickHouseUtils.format(ERROR_INCOMPLETE_READ, offset, length));
914977
} else {
915978
offset += read;
916979
}
@@ -1082,13 +1145,10 @@ public final void setCopyToTarget(OutputStream out) throws IOException {
10821145
if (this.copyTo != null) {
10831146
this.copyTo.flush();
10841147
} else if (out != null) {
1085-
// process remaining bytes in current buffer
1086-
readCustom((b, p, l) -> {
1087-
if (p < l) {
1088-
out.write(b, p, l - p);
1089-
}
1090-
return 0;
1091-
});
1148+
ClickHouseByteBuffer buf = getBuffer();
1149+
if (!buf.isEmpty()) {
1150+
out.write(buf.array(), buf.position(), buf.length());
1151+
}
10921152
}
10931153
this.copyTo = out;
10941154
}
@@ -1113,4 +1173,9 @@ public void close() throws IOException {
11131173
ClickHouseDataStreamFactory.handleCustomAction(postCloseAction);
11141174
}
11151175
}
1176+
1177+
@Override
1178+
public Iterator<ClickHouseByteBuffer> iterator() {
1179+
return new ByteBufferIterator(this);
1180+
}
11161181
}

0 commit comments

Comments
 (0)