Skip to content

Commit 262029f

Browse files
committed
Add fixed capacity policy for testing
1 parent 1bf60d7 commit 262029f

File tree

5 files changed

+36
-15
lines changed

5 files changed

+36
-15
lines changed

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseDataStreamFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public ClickHousePipedOutputStream createPipedOutputStream(ClickHouseConfig conf
162162
}
163163
return blocking
164164
? new BlockingPipedOutputStream(bufferSize, queue, timeout, postCloseAction)
165-
: new NonBlockingPipedOutputStream(bufferSize, queue, timeout, postCloseAction);
165+
: new NonBlockingPipedOutputStream(bufferSize, queue, timeout, null, postCloseAction);
166166
}
167167

168168
public ClickHousePipedOutputStream createPipedOutputStream(int writeBufferSize, int queueSize, int timeout,

clickhouse-client/src/main/java/com/clickhouse/client/stream/CapacityPolicy.java

+17
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,19 @@
22

33
@FunctionalInterface
44
public interface CapacityPolicy {
5+
static class FixedCapacity implements CapacityPolicy {
6+
private final int capacity;
7+
8+
protected FixedCapacity(int capacity) {
9+
this.capacity = capacity < 1 ? 0 : capacity;
10+
}
11+
12+
@Override
13+
public boolean ensureCapacity(int current) {
14+
return capacity < 1 || current < capacity;
15+
}
16+
}
17+
518
static class LinearDynamicCapacity implements CapacityPolicy {
619
private volatile int capacity;
720
private volatile int count;
@@ -31,6 +44,10 @@ public boolean ensureCapacity(int current) {
3144
}
3245
}
3346

47+
static CapacityPolicy fixedCapacity(int capacity) {
48+
return new FixedCapacity(capacity);
49+
}
50+
3451
static CapacityPolicy linearDynamicCapacity(int initialSize, int maxSize, int threshold) {
3552
return new LinearDynamicCapacity(initialSize, maxSize, threshold);
3653
}

clickhouse-client/src/main/java/com/clickhouse/client/stream/NonBlockingPipedOutputStream.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,12 @@ private void updateBuffer(byte[] bytes, int offset, int length) throws IOExcepti
8080
}
8181
}
8282

83-
public NonBlockingPipedOutputStream(int bufferSize, int queueLength, int timeout, Runnable postCloseAction) {
83+
public NonBlockingPipedOutputStream(int bufferSize, int queueLength, int timeout, CapacityPolicy policy,
84+
Runnable postCloseAction) {
8485
super(postCloseAction);
8586

86-
this.queue = new AdaptiveQueue<>(CapacityPolicy.linearDynamicCapacity(1, queueLength, 0));
87+
this.queue = new AdaptiveQueue<>(
88+
policy != null ? policy : CapacityPolicy.linearDynamicCapacity(1, queueLength, 0));
8789

8890
// may need an initialBufferSize and a monitor to update bufferSize in runtime
8991
this.bufferSize = ClickHouseUtils.getBufferSize(bufferSize,

clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseDataStreamFactoryTest.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@ public void testCreatePipedOutputStream() throws Exception {
1818
// read in worker thread
1919
for (int i = 0; i < 256; i++) {
2020
CompletableFuture<Integer> future;
21-
try (ClickHousePipedOutputStream out = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(
22-
config,
23-
null)) {
21+
try (ClickHousePipedOutputStream out = ClickHouseDataStreamFactory.getInstance()
22+
.createPipedOutputStream(config, null)) {
2423
future = ClickHouseClient.submit(() -> {
2524
try (ClickHouseInputStream in = out.getInputStream()) {
2625
return in.read();
@@ -33,8 +32,7 @@ public void testCreatePipedOutputStream() throws Exception {
3332

3433
// write in worker thread
3534
for (int i = 0; i < 256; i++) {
36-
ClickHousePipedOutputStream out = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(
37-
config,
35+
ClickHousePipedOutputStream out = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config,
3836
null);
3937
final int num = i;
4038
try (ClickHouseInputStream in = out.getInputStream()) {

clickhouse-client/src/test/java/com/clickhouse/client/stream/NonBlockingPipedOutputStreamTest.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
public class NonBlockingPipedOutputStreamTest {
1919
@Test(groups = { "unit" })
2020
public void testRead() throws Exception {
21-
NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(4, 3, 1, null);
21+
NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(4, 3, 1, CapacityPolicy.fixedCapacity(3),
22+
null);
2223
Assert.assertEquals(stream.queue.size(), 0);
2324
try (InputStream in = stream.getInputStream()) {
2425
in.read();
@@ -76,7 +77,8 @@ public void testRead() throws Exception {
7677

7778
@Test(groups = { "unit" })
7879
public void testReadBytes() throws Exception {
79-
NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(4, 3, 1, null);
80+
NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(4, 3, 1, CapacityPolicy.fixedCapacity(3),
81+
null);
8082
Assert.assertEquals(stream.queue.size(), 0);
8183
byte[] bytes = new byte[3];
8284
try (InputStream in = stream.getInputStream()) {
@@ -138,7 +140,8 @@ public void testReadBytes() throws Exception {
138140

139141
@Test(groups = { "unit" })
140142
public void testWrite() throws Exception {
141-
NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(2, 3, 2, null);
143+
NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(2, 3, 2, CapacityPolicy.fixedCapacity(3),
144+
null);
142145
Assert.assertEquals(stream.queue.size(), 0);
143146
try (OutputStream out = stream) {
144147
out.write(5);
@@ -153,7 +156,7 @@ public void testWrite() throws Exception {
153156
Assert.assertEquals(stream.queue.poll(), new byte[] { (byte) 7 });
154157
}
155158

156-
stream = new NonBlockingPipedOutputStream(1, 1, 2, null);
159+
stream = new NonBlockingPipedOutputStream(1, 1, 2, CapacityPolicy.fixedCapacity(1), null);
157160
Assert.assertEquals(stream.queue.size(), 0);
158161
try (OutputStream out = stream) {
159162
out.write(5);
@@ -178,7 +181,8 @@ public void testWrite() throws Exception {
178181

179182
@Test(groups = { "unit" })
180183
public void testWriteBytes() throws Exception {
181-
NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(2, 3, 2, null);
184+
NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(2, 3, 2, CapacityPolicy.fixedCapacity(3),
185+
null);
182186
Assert.assertEquals(stream.queue.size(), 0);
183187
try (OutputStream out = stream) {
184188
out.write(new byte[] { (byte) 9, (byte) 10 });
@@ -204,12 +208,12 @@ public void testWriteBytes() throws Exception {
204208

205209
@Test(groups = { "unit" })
206210
public void testPipedStream() throws Exception {
207-
final int timeout = 60000;
211+
final int timeout = 10000;
208212
ExecutorService executor = Executors.newFixedThreadPool(2);
209213
for (int bufferSize = -1; bufferSize < 10; bufferSize++) {
210214
for (int queueLength = -1; queueLength < 10; queueLength++) {
211215
final NonBlockingPipedOutputStream stream = new NonBlockingPipedOutputStream(bufferSize, queueLength,
212-
timeout, null);
216+
timeout, CapacityPolicy.fixedCapacity(queueLength), null);
213217
try (InputStream in = stream.getInputStream(); OutputStream out = stream) {
214218
int count = 10000;
215219
CountDownLatch latch = new CountDownLatch(count + 1);

0 commit comments

Comments
 (0)