Skip to content

Commit e7b0e6a

Browse files
committed
Revert changes made on streaming
1 parent e9a3bc7 commit e7b0e6a

File tree

4 files changed

+12
-20
lines changed

4 files changed

+12
-20
lines changed

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private void ensureOpen() throws IOException {
4040
throw new IOException("Stream has been closed");
4141
}
4242

43-
if (buffer == null || (buffer != EMPTY && buffer.limit() > 0 && !buffer.hasRemaining())) {
43+
if (buffer == null || (buffer != EMPTY && !buffer.hasRemaining())) {
4444
updateBuffer();
4545
}
4646
}
@@ -65,17 +65,9 @@ private int updateBuffer() throws IOException {
6565

6666
@Override
6767
public int available() throws IOException {
68-
if (closed || buffer == EMPTY) {
69-
return 0;
70-
}
68+
ensureOpen();
7169

72-
int available = 0;
73-
if (buffer == null || (buffer.limit() > 0 && !buffer.hasRemaining())) {
74-
available = updateBuffer();
75-
} else {
76-
available = buffer.remaining();
77-
}
78-
return available;
70+
return buffer.remaining();
7971
}
8072

8173
@Override
@@ -95,7 +87,7 @@ public void close() throws IOException {
9587
public byte readByte() throws IOException {
9688
ensureOpen();
9789

98-
if (buffer == EMPTY || buffer.limit() == 0) {
90+
if (buffer == EMPTY) {
9991
close();
10092
throw new EOFException();
10193
}
@@ -107,7 +99,7 @@ public byte readByte() throws IOException {
10799
public int read() throws IOException {
108100
ensureOpen();
109101

110-
if (buffer == EMPTY || buffer.limit() == 0) {
102+
if (buffer == EMPTY) {
111103
return -1;
112104
}
113105

@@ -120,7 +112,7 @@ public int read(byte[] b, int off, int len) throws IOException {
120112

121113
int counter = 0;
122114
while (len > 0) {
123-
if (buffer == EMPTY || buffer.limit() == 0) {
115+
if (buffer == EMPTY) {
124116
return counter > 0 ? counter : -1;
125117
}
126118

clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ public class ClickHouseLZ4InputStream extends ClickHouseInputStream {
2828
private boolean closed;
2929

3030
private boolean checkNext() throws IOException {
31-
if (currentBlock == null) {
31+
if (currentBlock == null || !currentBlock.hasRemaining()) {
3232
currentBlock = readNextBlock();
3333
}
34-
return currentBlock != null && currentBlock.hasRemaining();
34+
return currentBlock != null;
3535
}
3636

3737
// every block is:

clickhouse-http-client/src/main/java11/com/clickhouse/client/http/ClickHouseResponseHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.concurrent.CompletableFuture;
1010
import java.util.concurrent.CompletionStage;
1111
import java.util.concurrent.LinkedBlockingDeque;
12-
import java.util.concurrent.TimeUnit;
1312
import java.util.concurrent.Flow.Subscription;
1413
import java.util.concurrent.atomic.AtomicBoolean;
1514

clickhouse-http-client/src/main/java11/com/clickhouse/client/http/DefaultHttpConnection.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,15 @@ private ClickHouseHttpResponse postString(HttpRequest.Builder reqBuilder, String
183183
reqBuilder.POST(HttpRequest.BodyPublishers.ofString(sql));
184184
HttpResponse<InputStream> r;
185185
try {
186-
// r = httpClient.send(reqBuilder.build(), responseInfo -> new
187-
// ExtendedResponseInputStream());
188-
r = httpClient.send(reqBuilder.build(),
186+
CompletableFuture<HttpResponse<InputStream>> f = httpClient.sendAsync(reqBuilder.build(),
189187
responseInfo -> new ClickHouseResponseHandler(config.getMaxBufferSize(),
190188
config.getSocketTimeout()));
189+
r = f.get();
191190
} catch (InterruptedException e) {
192191
Thread.currentThread().interrupt();
193192
throw new IOException("Thread was interrupted when posting request or receiving response", e);
193+
} catch (ExecutionException e) {
194+
throw new IOException("Failed to post query", e);
194195
}
195196
return buildResponse(r);
196197
}

0 commit comments

Comments
 (0)