Skip to content

Commit 4748b8a

Browse files
committed
Enhance pipeline and introduce buffering mode
1 parent a849396 commit 4748b8a

File tree

61 files changed

+2941
-495
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+2941
-495
lines changed

clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/client/ClientState.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.clickhouse.client.ClickHouseClient;
1616
import com.clickhouse.client.ClickHouseClientBuilder;
1717
import com.clickhouse.client.ClickHouseCredentials;
18+
import com.clickhouse.client.ClickHouseException;
1819
import com.clickhouse.client.ClickHouseCompression;
1920
import com.clickhouse.client.ClickHouseFormat;
2021
import com.clickhouse.client.ClickHouseNode;
@@ -79,7 +80,7 @@ private ClickHouseClient createClient() {
7980
}
8081

8182
@Setup(Level.Trial)
82-
public void doSetup(ServerState serverState) throws Exception {
83+
public void doSetup(ServerState serverState) throws ClickHouseException {
8384
server = ClickHouseNode.builder().host(serverState.getHost()).port(ClickHouseProtocol.valueOf(protocol))
8485
.database(serverState.getDatabase())
8586
.credentials(
@@ -91,18 +92,18 @@ public void doSetup(ServerState serverState) throws Exception {
9192
"create table if not exists system.test_insert(id String, i Nullable(UInt64), s Nullable(String), t Nullable(DateTime))engine=Memory" };
9293

9394
for (String sql : sqls) {
94-
try (ClickHouseResponse resp = client.connect(server).query(sql).execute().get()) {
95+
try (ClickHouseResponse resp = client.connect(server).query(sql).executeAndWait()) {
9596

9697
}
9798
}
9899
}
99100

100101
@TearDown(Level.Trial)
101-
public void doTearDown(ServerState serverState) throws Exception {
102+
public void doTearDown(ServerState serverState) throws ClickHouseException {
102103
dispose();
103104

104-
try (ClickHouseResponse resp = client.connect(server).query("truncate table system.test_insert").execute()
105-
.get()) {
105+
try (ClickHouseResponse resp = client.connect(server).query("truncate table system.test_insert")
106+
.executeAndWait()) {
106107

107108
} finally {
108109
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package com.clickhouse.benchmark.misc;
2+
3+
import java.io.ByteArrayInputStream;
4+
import java.io.ByteArrayOutputStream;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.Serializable;
8+
import java.util.Arrays;
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
import java.util.Random;
12+
import java.util.concurrent.TimeUnit;
13+
14+
import com.clickhouse.benchmark.BaseState;
15+
import com.clickhouse.client.ClickHouseClient;
16+
import com.clickhouse.client.ClickHouseConfig;
17+
import com.clickhouse.client.ClickHouseInputStream;
18+
import com.clickhouse.client.ClickHouseOutputStream;
19+
import com.clickhouse.client.config.ClickHouseBufferingMode;
20+
import com.clickhouse.client.config.ClickHouseClientOption;
21+
import com.clickhouse.client.config.ClickHouseOption;
22+
23+
import org.openjdk.jmh.annotations.Benchmark;
24+
import org.openjdk.jmh.annotations.BenchmarkMode;
25+
import org.openjdk.jmh.annotations.Fork;
26+
import org.openjdk.jmh.annotations.Level;
27+
import org.openjdk.jmh.annotations.Measurement;
28+
import org.openjdk.jmh.annotations.Mode;
29+
import org.openjdk.jmh.annotations.OutputTimeUnit;
30+
import org.openjdk.jmh.annotations.Scope;
31+
import org.openjdk.jmh.annotations.Setup;
32+
import org.openjdk.jmh.annotations.State;
33+
import org.openjdk.jmh.annotations.Threads;
34+
import org.openjdk.jmh.annotations.Warmup;
35+
import org.openjdk.jmh.infra.Blackhole;
36+
37+
/**
38+
* Blocking:
39+
* Benchmark Mode Cnt Score Error Units
40+
* StreamBenchmark.async thrpt 20 1.574 ? 0.039 ops/s
41+
* StreamBenchmark.jdk thrpt 20 4281.206 ? 91.983 ops/s
42+
* StreamBenchmark.piped thrpt 20 3913.994 ? 142.566 ops/s
43+
* StreamBenchmark.wrapped thrpt 20 3939.248 ? 54.868 ops/s
44+
*
45+
* Non-blocking:
46+
*
47+
*/
48+
@State(Scope.Benchmark)
49+
@Warmup(iterations = 10, timeUnit = TimeUnit.SECONDS, time = 1)
50+
@Measurement(iterations = 10, timeUnit = TimeUnit.SECONDS, time = 1)
51+
@Fork(value = 2)
52+
@Threads(value = -1)
53+
@BenchmarkMode(Mode.Throughput)
54+
@OutputTimeUnit(TimeUnit.SECONDS)
55+
public class StreamBenchmark {
56+
@State(Scope.Thread)
57+
public static class StreamState extends BaseState {
58+
public int bufferSize;
59+
public int samples;
60+
61+
public byte[] bytes;
62+
public ClickHouseConfig config;
63+
64+
@Setup(Level.Trial)
65+
public void setupSamples() {
66+
bufferSize = Integer.getInteger("buffer",
67+
(int) ClickHouseClientOption.WRITE_BUFFER_SIZE.getDefaultValue());
68+
samples = Integer.getInteger("samples", 500000);
69+
70+
bytes = new byte[samples];
71+
72+
Map<ClickHouseOption, Serializable> options = new HashMap<>();
73+
options.put(ClickHouseClientOption.ASYNC, Boolean.parseBoolean(System.getProperty("async", "true")));
74+
options.put(ClickHouseClientOption.REQUEST_BUFFERING, ClickHouseBufferingMode.valueOf(
75+
System.getProperty("mode", ClickHouseClientOption.REQUEST_BUFFERING.getDefaultValue().toString())
76+
.toUpperCase()));
77+
options.put(ClickHouseClientOption.WRITE_BUFFER_SIZE, bufferSize);
78+
options.put(ClickHouseClientOption.MAX_QUEUED_BUFFERS,
79+
Integer.getInteger("queue", (int) ClickHouseClientOption.MAX_QUEUED_BUFFERS.getDefaultValue()));
80+
options.put(ClickHouseClientOption.COMPRESS, Boolean.parseBoolean(System.getProperty("compress", "false")));
81+
options.put(ClickHouseClientOption.DECOMPRESS,
82+
Boolean.parseBoolean(System.getProperty("compress", "false")));
83+
options.put(ClickHouseClientOption.USE_BLOCKING_QUEUE,
84+
Boolean.parseBoolean(System.getProperty("blocking", "true")));
85+
config = new ClickHouseConfig(options, null, null, null);
86+
}
87+
88+
@Setup(Level.Iteration)
89+
public void initStream() {
90+
new Random().nextBytes(bytes);
91+
}
92+
}
93+
94+
@Benchmark
95+
public void classic(StreamState state, Blackhole consumer) throws IOException {
96+
int size = state.bufferSize;
97+
byte[] buffer = new byte[size];
98+
int count = 0;
99+
ByteArrayOutputStream out = new ByteArrayOutputStream(state.samples);
100+
try (InputStream in = new ByteArrayInputStream(state.bytes)) {
101+
int read = 0;
102+
while ((read = in.read(buffer, 0, size)) > 0) {
103+
out.write(buffer, 0, read);
104+
count += read;
105+
}
106+
if (count != state.samples) {
107+
throw new IllegalStateException(String.format("Expect %d bytes but got %d", size, count));
108+
}
109+
out.flush();
110+
out.close();
111+
}
112+
if (!Arrays.equals(state.bytes, out.toByteArray())) {
113+
throw new IllegalStateException("Incorrect result");
114+
}
115+
}
116+
117+
@Benchmark
118+
public void piped(StreamState state, Blackhole consumer) throws IOException {
119+
int size = state.bufferSize;
120+
long count = 0;
121+
ByteArrayOutputStream out = new ByteArrayOutputStream(state.samples);
122+
try (InputStream in = new ByteArrayInputStream(state.bytes)) {
123+
if ((count = ClickHouseInputStream.pipe(in, out, size)) != state.samples) {
124+
throw new IllegalStateException(String.format("Expect %d bytes but got %d", size, count));
125+
}
126+
out.flush();
127+
out.close();
128+
}
129+
if (!Arrays.equals(state.bytes, out.toByteArray())) {
130+
throw new IllegalStateException("Incorrect result");
131+
}
132+
}
133+
134+
@Benchmark
135+
public void wrapped(StreamState state, Blackhole consumer) throws IOException {
136+
int size = state.bufferSize;
137+
long count = 0;
138+
ByteArrayOutputStream bao = new ByteArrayOutputStream(state.samples);
139+
try (ClickHouseInputStream in = ClickHouseInputStream.of(new ByteArrayInputStream(state.bytes), size);
140+
ClickHouseOutputStream out = ClickHouseOutputStream.of(bao, size)) {
141+
if ((count = in.pipe(out)) != state.samples) {
142+
throw new IllegalStateException(String.format("Expect %d bytes but got %d", size, count));
143+
}
144+
out.flush();
145+
}
146+
if (!Arrays.equals(state.bytes, bao.toByteArray())) {
147+
throw new IllegalStateException("Incorrect result");
148+
}
149+
}
150+
151+
@Benchmark
152+
public void async(StreamState state, Blackhole consumer) throws IOException {
153+
int size = state.bufferSize;
154+
long count = 0;
155+
ByteArrayOutputStream bao = new ByteArrayOutputStream(state.samples);
156+
try (ClickHouseInputStream in = ClickHouseInputStream.of(new ByteArrayInputStream(state.bytes), size);
157+
ClickHouseOutputStream out = ClickHouseClient.getAsyncRequestOutputStream(state.config, bao, null)) {
158+
if ((count = in.pipe(out)) != state.samples) {
159+
throw new IllegalStateException(String.format("Expect %d bytes but got %d", size, count));
160+
}
161+
out.flush();
162+
}
163+
if (!Arrays.equals(state.bytes, bao.toByteArray())) {
164+
throw new IllegalStateException("Incorrect result");
165+
}
166+
}
167+
}

0 commit comments

Comments
 (0)