Skip to content

Enhance input/output stream and fix grpc test failure #1238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public boolean match(ClickHouseNode node) {
* Checks if the given protocol matches any of the preferred protocols.
*
* @param protocol protocol to check
* @return true if the protocl matches at least one of preferred protocols;
* @return true if the protocol matches at least one of preferred protocols;
* false otherwise
*/
public boolean matchAnyOfPreferredProtocols(ClickHouseProtocol protocol) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public void testInitialization() {
for (ClickHouseClient client : new ClickHouseClient[] { client1, client2, client3, client4, client5 }) {
Assert.assertEquals(client.getClass(), Agent.class);
Assert.assertEquals(((Agent) client).getClient().getClass(), getClientClass());
Assert.assertTrue(client.accept(getProtocol()), "The client should support protocl: " + getProtocol());
Assert.assertTrue(client.accept(getProtocol()), "The client should support protocol: " + getProtocol());
}
}
}
Expand Down Expand Up @@ -1541,6 +1541,8 @@ public void testDumpFile(boolean gzipCompressed, boolean useOneLiner)
}
}
Assert.assertNotNull(summary);
long fileSize = Files.size(file.toPath());
Assert.assertTrue(fileSize > 0L, "Expects an non-empty file being created");
try (InputStream in = gzipCompressed ? new GZIPInputStream(new FileInputStream(file))
: new FileInputStream(file); ByteArrayOutputStream out = new ByteArrayOutputStream()) {
ClickHouseInputStream.pipe(in, out, 512);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.Serializable;
import java.math.BigInteger;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -36,6 +37,31 @@ public static ClickHouseByteBuffer newInstance() {
return new ClickHouseByteBuffer(EMPTY_BYTES, 0, 0);
}

/**
* Wraps given byte buffer.
*
* @param buffer byte buffer
* @return non-null wrapped byte buffer
*/
public static ClickHouseByteBuffer of(ByteBuffer buffer) {
if (buffer == null || !buffer.hasRemaining()) {
return newInstance();
}

int pos = buffer.position();
int len = buffer.remaining();
byte[] bytes;
if (buffer.hasArray()) {
bytes = buffer.array();
} else {
bytes = new byte[len];
buffer.get(bytes);
((Buffer) buffer).position(pos);
pos = 0;
}
return new ClickHouseByteBuffer(bytes, pos, len);
}

/**
* Wraps given byte array as byte buffer.
*
Expand Down Expand Up @@ -546,6 +572,31 @@ public ClickHouseByteBuffer slice(int offset, int length) {
return ClickHouseByteBuffer.of(array, offset + position, length);
}

/**
* Updates buffer.
*
* @param buffer byte buffer
* @return this byte buffer
*/
public ClickHouseByteBuffer update(ByteBuffer buffer) {
if (buffer == null || !buffer.hasRemaining()) {
return reset();
}

int pos = buffer.position();
int len = buffer.remaining();
byte[] bytes;
if (buffer.hasArray()) {
bytes = buffer.array();
} else {
bytes = new byte[len];
buffer.get(bytes);
((Buffer) buffer).position(pos);
pos = 0;
}
return update(bytes, pos, len);
}

/**
* Updates buffer.
*
Expand Down Expand Up @@ -659,6 +710,26 @@ public byte[] array() {
return array;
}

/**
* Creates a copy of the current byte buffer.
*
* @param deep true to copy the underlying byte array; false to reuse
* @return non-null copy of the current byte buffer
*/
public ClickHouseByteBuffer copy(boolean deep) {
byte[] bytes;
int pos;
if (deep) {
bytes = new byte[length];
pos = 0;
System.arraycopy(array, position, bytes, 0, length);
} else {
bytes = array;
pos = position;
}
return ClickHouseByteBuffer.of(bytes, pos, length);
}

public byte firstByte() {
return array[position];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.Serializable;
import java.io.StreamCorruptedException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -130,6 +132,19 @@ private ClickHouseRecord nextRecord() throws NoSuchElementException, UncheckedIO
final ClickHouseRecord r = config.isReuseValueWrapper() ? currentRecord : currentRecord.copy();
try {
readAndFill(r);
} catch (StreamCorruptedException e) {
byte[] search = "ode: ".getBytes(StandardCharsets.US_ASCII);
byte[] bytes = input.getBuffer().array();
int index = ClickHouseUtils.indexOf(bytes, search);
if (index > 0 && bytes[--index] == (byte) 'C') {
throw new UncheckedIOException(new String(bytes, index, bytes.length - index, StandardCharsets.UTF_8),
e);
} else {
throw new UncheckedIOException(
ClickHouseUtils.format(ERROR_FAILED_TO_READ, readPosition + 1, columns.length,
columns[readPosition]),
e);
}
} catch (EOFException e) {
if (readPosition == 0) { // end of the stream, which is fine
throw new NoSuchElementException("No more record");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StreamCorruptedException;
import java.io.UncheckedIOException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -24,7 +27,6 @@

import com.clickhouse.data.stream.BlockingInputStream;
import com.clickhouse.data.stream.DeferredInputStream;
import com.clickhouse.data.stream.DelegatedInputStream;
import com.clickhouse.data.stream.EmptyInputStream;
import com.clickhouse.data.stream.RestrictedInputStream;
import com.clickhouse.data.stream.IterableByteArrayInputStream;
Expand All @@ -40,14 +42,45 @@
* creation as well as closing the stream when it reaches end of stream. This
* class is also responsible for creating various input stream as needed.
*/
public abstract class ClickHouseInputStream extends InputStream {
public abstract class ClickHouseInputStream extends InputStream implements Iterable<ClickHouseByteBuffer> {
protected static final String ERROR_INCOMPLETE_READ = "Reached end of input stream after reading %d of %d bytes";
protected static final String ERROR_NULL_BYTES = "Non-null byte array is required";
protected static final String ERROR_REUSE_BUFFER = "Please pass a different byte array instead of the same internal buffer for reading";
protected static final String ERROR_STREAM_CLOSED = "Input stream has been closed";

public static final String TYPE_NAME = "InputStream";

static class ByteBufferIterator implements Iterator<ClickHouseByteBuffer> {
private final ClickHouseInputStream input;

private ByteBufferIterator(ClickHouseInputStream input) {
this.input = input;
}

@Override
public boolean hasNext() {
try {
return input.available() > 0;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public ClickHouseByteBuffer next() {
try {
ClickHouseByteBuffer buffer = input.nextBuffer();
if (buffer.isEmpty() && input.available() < 1) {
throw new NoSuchElementException(
"No more byte buffer for read as we reached end of the stream");
}
return buffer;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

/**
* Wraps the given input stream.
*
Expand Down Expand Up @@ -733,6 +766,22 @@ protected void ensureOpen() throws IOException {
}
}

/**
* Gets reference to current byte buffer.
*
* @return non-null byte buffer
*/
protected abstract ClickHouseByteBuffer getBuffer();

/**
* Gets reference to next byte buffer available for read. An empty byte buffer
* will be returned ({@code nextBuffer().isEmpty() == true}), when it reaches
* end of the input stream.
*
* @return non-null byte buffer
*/
protected abstract ClickHouseByteBuffer nextBuffer() throws IOException;

/**
* Gets underlying file. Same as
* {@code ClickHouseFile.of(getUnderlyingStream())}.
Expand Down Expand Up @@ -827,7 +876,21 @@ public final <T> T setUserData(String key, T value) {
* @throws IOException when failed to read value from input stream or reached
* end of the stream
*/
public abstract long pipe(ClickHouseOutputStream output) throws IOException;
public long pipe(ClickHouseOutputStream output) throws IOException {
long count = 0L;
if (output == null || output.isClosed()) {
return count;
}
ensureOpen();

try (ClickHouseInputStream in = this) {
for (ClickHouseByteBuffer buf : in) {
count += buf.length();
output.writeBuffer(buf);
}
}
return count;
}

/**
* Reads an unsigned byte from the input stream. Unlike {@link #read()}, it will
Expand Down Expand Up @@ -910,7 +973,7 @@ public byte[] readBytes(int length) throws IOException {
if (read == -1) {
closeQuietly();
throw offset == 0 ? new EOFException()
: new IOException(ClickHouseUtils.format(ERROR_INCOMPLETE_READ, offset, length));
: new StreamCorruptedException(ClickHouseUtils.format(ERROR_INCOMPLETE_READ, offset, length));
} else {
offset += read;
}
Expand Down Expand Up @@ -1082,13 +1145,10 @@ public final void setCopyToTarget(OutputStream out) throws IOException {
if (this.copyTo != null) {
this.copyTo.flush();
} else if (out != null) {
// process remaining bytes in current buffer
readCustom((b, p, l) -> {
if (p < l) {
out.write(b, p, l - p);
}
return 0;
});
ClickHouseByteBuffer buf = getBuffer();
if (!buf.isEmpty()) {
out.write(buf.array(), buf.position(), buf.length());
}
}
this.copyTo = out;
}
Expand All @@ -1113,4 +1173,9 @@ public void close() throws IOException {
ClickHouseDataStreamFactory.handleCustomAction(postCloseAction);
}
}

@Override
public Iterator<ClickHouseByteBuffer> iterator() {
return new ByteBufferIterator(this);
}
}
Loading