Skip to content

Commit 8eb0fa7

Browse files
committed
Fix write timeout error
1 parent 262029f commit 8eb0fa7

File tree

3 files changed

+57
-26
lines changed

3 files changed

+57
-26
lines changed

clickhouse-client/src/main/java/com/clickhouse/client/stream/NonBlockingPipedOutputStream.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,12 @@ private byte[] allocateBuffer() {
4848
}
4949

5050
private void updateBuffer(boolean allocateNewBuffer) throws IOException {
51-
if (position < 1) {
52-
return;
53-
}
54-
5551
updateBuffer(buffer, 0, position);
5652

5753
if (allocateNewBuffer) {
5854
buffer = allocateBuffer();
55+
} else {
56+
position = 0;
5957
}
6058
}
6159

clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHousePipedStreamTest.java

+31-10
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
import java.io.IOException;
44
import java.io.InputStream;
55
import java.io.OutputStream;
6+
import java.io.UncheckedIOException;
67
import java.nio.ByteBuffer;
78
import java.nio.Buffer;
9+
import java.util.Arrays;
810
import java.util.concurrent.CountDownLatch;
911
import java.util.concurrent.ExecutorService;
1012
import java.util.concurrent.Executors;
1113
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.atomic.AtomicInteger;
1215

1316
import com.clickhouse.client.ClickHouseByteBuffer;
1417

@@ -214,8 +217,10 @@ public void testPipedStream() throws Exception {
214217
for (int queueLength = -1; queueLength < 10; queueLength++) {
215218
ClickHousePipedStream stream = new ClickHousePipedStream(bufferSize, queueLength, timeout);
216219
try (InputStream in = stream.getInput(); OutputStream out = stream) {
217-
int count = 10000;
218-
CountDownLatch latch = new CountDownLatch(count + 1);
220+
final int count = 10000;
221+
final AtomicInteger p = new AtomicInteger(0);
222+
final AtomicInteger n = new AtomicInteger(0);
223+
final CountDownLatch latch = new CountDownLatch(count + 1);
219224
executor.execute(() -> {
220225
for (int i = 0; i < count; i++) {
221226
byte[] bytes = new byte[] { (byte) (0xFF & i), (byte) (0xFF & i + 1),
@@ -240,23 +245,39 @@ public void testPipedStream() throws Exception {
240245
(byte) (0xFF & i + 2) };
241246
byte[] b = new byte[bytes.length];
242247
try {
243-
Assert.assertEquals(in.read(b), b.length);
244-
latch.countDown();
245-
Assert.assertEquals(b, bytes);
248+
if (in.read(b) == b.length && Arrays.equals(b, bytes)) {
249+
p.incrementAndGet();
250+
} else {
251+
n.incrementAndGet();
252+
}
246253
} catch (IOException e) {
247-
Assert.fail("Failed to read", e);
254+
Thread.currentThread().interrupt();
255+
throw new UncheckedIOException(e);
256+
} finally {
257+
latch.countDown();
248258
}
249259
}
250260

251261
try {
252-
Assert.assertEquals(in.read(), -1);
253-
latch.countDown();
262+
if (in.read() == -1) {
263+
p.incrementAndGet();
264+
} else {
265+
n.incrementAndGet();
266+
}
254267
} catch (IOException e) {
255-
Assert.fail("Failed to read EOF", e);
268+
Thread.currentThread().interrupt();
269+
throw new UncheckedIOException(e);
270+
} finally {
271+
latch.countDown();
256272
}
257273
});
258274

259-
latch.await(timeout / 1000 * 3, TimeUnit.SECONDS);
275+
if (!latch.await(timeout / 1000, TimeUnit.SECONDS)) {
276+
Assert.fail(String.format("Countdown latch(%d of %d) timed out after waiting %d seconds",
277+
count + 1, latch.getCount(), timeout / 1000));
278+
}
279+
Assert.assertEquals(n.get(), 0);
280+
Assert.assertEquals(p.get(), count + 1);
260281
}
261282
}
262283
}

clickhouse-client/src/test/java/com/clickhouse/client/stream/NonBlockingPipedOutputStreamTest.java

+24-12
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.ExecutorService;
1010
import java.util.concurrent.Executors;
1111
import java.util.concurrent.TimeUnit;
12+
import java.util.concurrent.atomic.AtomicInteger;
1213

1314
import com.clickhouse.client.ClickHouseByteBuffer;
1415

@@ -215,8 +216,10 @@ public void testPipedStream() throws Exception {
215216
final NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(bufferSize, queueLength,
216217
timeout, CapacityPolicy.fixedCapacity(queueLength), null);
217218
try (InputStream in = stream.getInputStream(); OutputStream out = stream) {
218-
int count = 10000;
219-
CountDownLatch latch = new CountDownLatch(count + 1);
219+
final int count = 10000;
220+
final AtomicInteger p = new AtomicInteger(0);
221+
final AtomicInteger n = new AtomicInteger(0);
222+
final CountDownLatch latch = new CountDownLatch(count + 1);
220223
executor.execute(() -> {
221224
for (int i = 0; i < count; i++) {
222225
byte[] bytes = new byte[] { (byte) (0xFF & i), (byte) (0xFF & i + 1),
@@ -245,30 +248,39 @@ public void testPipedStream() throws Exception {
245248
(byte) (0xFF & i + 2) };
246249
byte[] b = new byte[bytes.length];
247250
try {
248-
Assert.assertEquals(in.read(b), b.length);
249-
latch.countDown();
250-
if (!Arrays.equals(b, bytes)) {
251-
System.out.println("Why?");
251+
if (in.read(b) == b.length && Arrays.equals(b, bytes)) {
252+
p.incrementAndGet();
253+
} else {
254+
n.incrementAndGet();
252255
}
253-
Assert.assertEquals(b, bytes);
254256
} catch (IOException e) {
255257
Thread.currentThread().interrupt();
256258
throw new UncheckedIOException(e);
257-
// Assert.fail("Failed to read", e);
259+
} finally {
260+
latch.countDown();
258261
}
259262
}
260263

261264
try {
262-
Assert.assertEquals(in.read(), -1);
263-
latch.countDown();
265+
if (in.read() == -1) {
266+
p.incrementAndGet();
267+
} else {
268+
n.incrementAndGet();
269+
}
264270
} catch (IOException e) {
265271
Thread.currentThread().interrupt();
266272
throw new UncheckedIOException(e);
267-
// Assert.fail("Failed to read EOF", e);
273+
} finally {
274+
latch.countDown();
268275
}
269276
});
270277

271-
latch.await(timeout / 1000, TimeUnit.SECONDS);
278+
if (!latch.await(timeout / 1000, TimeUnit.SECONDS)) {
279+
Assert.fail(String.format("Countdown latch(%d of %d) timed out after waiting %d seconds",
280+
count + 1, latch.getCount(), timeout / 1000));
281+
}
282+
Assert.assertEquals(n.get(), 0);
283+
Assert.assertEquals(p.get(), count + 1);
272284
}
273285
}
274286
}

0 commit comments

Comments
 (0)