Skip to content

Commit b5d610f

Browse files
authored
Merge pull request #1008 from zhicwu/develop
Experimental transaction support
2 parents 7dcfae5 + 1d68dcd commit b5d610f

File tree

50 files changed

+2828
-510
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2828
-510
lines changed

.github/workflows/build.yml

+1-3
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,10 @@ jobs:
3737
strategy:
3838
matrix:
3939
# most recent LTS releases as well as latest stable builds
40-
clickhouse: ["21.3", "21.8", "latest"]
40+
clickhouse: ["21.8", "22.3", "latest"]
4141
# http2 here represents http protocol + JDK HttpClient(http_connection_provider=HTTP_CLIENT)
4242
protocol: ["http", "http2", "grpc"]
4343
exclude:
44-
- clickhouse: "21.3"
45-
protocol: grpc
4644
- clickhouse: "21.8"
4745
protocol: grpc
4846
fail-fast: false

README.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ Note: in general, the new driver(v0.3.2) is a few times faster with less memory
5151
| | UUID | :white_check_mark: | |
5252
| High Availability | Load Balancing | :white_check_mark: | supported since 0.3.2-patch10 |
5353
| | Failover | :white_check_mark: | supported since 0.3.2-patch10 |
54+
| Transaction | Transaction | :white_check_mark: | supported since 0.3.2-patch11, use ClickHouse 22.7+ for native implicit transaction support |
55+
| | Savepoint | :x: | |
56+
| | XAConnection | :x: | |
5457

5558
## Examples
5659

@@ -178,7 +181,7 @@ It's time consuming to run all benchmarks against all drivers using different pa
178181

179182
## Testing
180183

181-
By default, docker container will be created automatically during integration test. You can pass system property like `-DclickhouseVersion=21.8` to specify version of ClickHouse.
184+
By default, docker container will be created automatically during integration test. You can pass system property like `-DclickhouseVersion=22.3` to specify version of ClickHouse.
182185

183186
In the case you prefer to test against an existing server, please follow instructions below:
184187

clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import com.clickhouse.client.ClickHouseChecker;
3131
import com.clickhouse.client.ClickHouseClient;
32+
import com.clickhouse.client.ClickHouseCompression;
3233
import com.clickhouse.client.ClickHouseConfig;
3334
import com.clickhouse.client.ClickHouseCredentials;
3435
import com.clickhouse.client.ClickHouseFile;
@@ -136,8 +137,9 @@ static void dockerCommand(ClickHouseConfig config, String hostDir, String contai
136137
commands.add(DEFAULT_CLICKHOUSE_CLI_PATH);
137138
}
138139

139-
static Process startProcess(ClickHouseNode server, ClickHouseRequest<?> request) {
140+
static Process startProcess(ClickHouseRequest<?> request) {
140141
final ClickHouseConfig config = request.getConfig();
142+
final ClickHouseNode server = request.getServer();
141143
final int timeout = config.getSocketTimeout();
142144

143145
String hostDir = (String) config.getOption(ClickHouseCommandLineOption.CLI_WORK_DIRECTORY);
@@ -196,7 +198,7 @@ static Process startProcess(ClickHouseNode server, ClickHouseRequest<?> request)
196198
if (!ClickHouseChecker.isNullOrBlank(str)) {
197199
commands.add("--query_id=".concat(str));
198200
}
199-
commands.add("--query=".concat(request.getStatements(false).get(0)));
201+
commands.add("--query=".concat(str = request.getStatements(false).get(0)));
200202

201203
for (ClickHouseExternalTable table : request.getExternalTables()) {
202204
ClickHouseFile tableFile = table.getFile();
@@ -331,30 +333,36 @@ static Process startProcess(ClickHouseNode server, ClickHouseRequest<?> request)
331333
}
332334
}
333335

334-
private final ClickHouseNode server;
335336
private final ClickHouseRequest<?> request;
336337

337338
private final Process process;
338339

339340
private String error;
340341

341-
public ClickHouseCommandLine(ClickHouseNode server, ClickHouseRequest<?> request) {
342-
this.server = server;
342+
public ClickHouseCommandLine(ClickHouseRequest<?> request) {
343343
this.request = request;
344344

345-
this.process = startProcess(server, request);
345+
this.process = startProcess(request);
346346
this.error = null;
347347
}
348348

349349
public ClickHouseInputStream getInputStream() throws IOException {
350350
ClickHouseOutputStream out = request.getOutputStream().orElse(null);
351+
Runnable postCloseAction = () -> {
352+
IOException exp = getError();
353+
if (exp != null) {
354+
throw new UncheckedIOException(exp);
355+
}
356+
};
351357
if (out != null && !out.getUnderlyingFile().isAvailable()) {
352358
try (OutputStream o = out) {
353359
ClickHouseInputStream.pipe(process.getInputStream(), o, request.getConfig().getWriteBufferSize());
354360
}
355-
return ClickHouseInputStream.empty();
361+
return ClickHouseInputStream.wrap(null, ClickHouseInputStream.empty(),
362+
request.getConfig().getReadBufferSize(), postCloseAction, ClickHouseCompression.NONE, 0);
356363
} else {
357-
return ClickHouseInputStream.of(process.getInputStream(), request.getConfig().getReadBufferSize());
364+
return ClickHouseInputStream.of(process.getInputStream(), request.getConfig().getReadBufferSize(),
365+
postCloseAction);
358366
}
359367
}
360368

clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ protected ClickHouseCommandLine newConnection(ClickHouseCommandLine conn, ClickH
4545
closeConnection(conn, false);
4646
}
4747

48-
return new ClickHouseCommandLine(server, request);
48+
return new ClickHouseCommandLine(request);
4949
}
5050

5151
@Override

clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineResponse.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,14 @@ public class ClickHouseCommandLineResponse extends ClickHouseStreamResponse {
1212

1313
protected ClickHouseCommandLineResponse(ClickHouseConfig config, ClickHouseCommandLine cli) throws IOException {
1414
super(config, cli.getInputStream(), null, null, ClickHouseResponseSummary.EMPTY);
15+
this.cli = cli;
1516

1617
if (this.input.available() < 1) {
1718
IOException exp = cli.getError();
1819
if (exp != null) {
1920
throw exp;
2021
}
2122
}
22-
23-
this.cli = cli;
2423
}
2524

2625
@Override

clickhouse-cli-client/src/test/java/com/clickhouse/client/cli/ClickHouseCommandLineClientTest.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,6 @@ public void testCustomLoad() throws Exception {
5353
throw new SkipException("Skip due to time out error");
5454
}
5555

56-
@Test(groups = { "integration" })
57-
@Override
58-
public void testErrorDuringQuery() throws Exception {
59-
throw new SkipException(
60-
"Skip due to incomplete implementation(needs to consider ErrorOutputStream in deserialization as well)");
61-
}
62-
6356
@Test(groups = { "integration" })
6457
@Override
6558
public void testLoadRawData() throws Exception {
@@ -92,6 +85,12 @@ public void testReadWriteGeoTypes() {
9285
throw new SkipException("Skip due to session is not supported");
9386
}
9487

88+
@Test(groups = { "integration" })
89+
@Override
90+
public void testSessionLock() {
91+
throw new SkipException("Skip due to session is not supported");
92+
}
93+
9594
@Test(groups = { "integration" })
9695
@Override
9796
public void testTempTable() {

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.clickhouse.client;
22

3+
import java.io.IOException;
34
import java.io.InputStream;
45
import java.io.OutputStream;
56
import java.io.UncheckedIOException;
@@ -16,6 +17,7 @@
1617
import java.util.concurrent.ExecutionException;
1718
import java.util.concurrent.ExecutorService;
1819
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.TimeoutException;
1921
import java.util.function.Function;
2022

2123
import com.clickhouse.client.config.ClickHouseBufferingMode;
@@ -345,7 +347,7 @@ static CompletableFuture<ClickHouseResponseSummary> dump(ClickHouseNode server,
345347
} finally {
346348
try {
347349
output.close();
348-
} catch (Exception e) {
350+
} catch (IOException e) {
349351
// ignore
350352
}
351353
}
@@ -438,21 +440,22 @@ static CompletableFuture<ClickHouseResponseSummary> load(ClickHouseNode server,
438440
stream.close();
439441
}
440442
// wait until write & read acomplished
441-
try (ClickHouseResponse response = future.get()) {
443+
try (ClickHouseResponse response = future.get(client.getConfig().getSocketTimeout(),
444+
TimeUnit.MILLISECONDS)) {
442445
return response.getSummary();
443446
}
444447
} catch (InterruptedException e) {
445448
Thread.currentThread().interrupt();
446449
throw ClickHouseException.forCancellation(e, theServer);
447450
} catch (CancellationException e) {
448451
throw ClickHouseException.forCancellation(e, theServer);
449-
} catch (ExecutionException e) {
452+
} catch (ExecutionException | TimeoutException e) {
450453
throw ClickHouseException.of(e, theServer);
451454
} finally {
452455
if (input != null) {
453456
try {
454457
input.close();
455-
} catch (Exception e) {
458+
} catch (IOException e) {
456459
// ignore
457460
}
458461
}
@@ -491,7 +494,7 @@ static CompletableFuture<ClickHouseResponseSummary> load(ClickHouseNode server,
491494
} finally {
492495
try {
493496
input.close();
494-
} catch (Exception e) {
497+
} catch (IOException e) {
495498
// ignore
496499
}
497500
}
@@ -546,7 +549,7 @@ static CompletableFuture<List<ClickHouseResponseSummary>> send(ClickHouseNode se
546549
.option(ClickHouseClientOption.ASYNC, false).build()) {
547550
ClickHouseRequest<?> request = client.connect(theServer).format(ClickHouseFormat.RowBinary);
548551
if ((boolean) ClickHouseDefaults.AUTO_SESSION.getEffectiveDefaultValue() && queries.size() > 1) {
549-
request.session(UUID.randomUUID().toString(), false);
552+
request.session(request.getManager().createSessionId(), false);
550553
}
551554
for (String query : queries) {
552555
try (ClickHouseResponse resp = request.query(query).executeAndWait()) {
@@ -818,13 +821,13 @@ default ClickHouseResponse executeAndWait(ClickHouseRequest<?> request) throws C
818821
final ClickHouseRequest<?> sealedRequest = request.seal();
819822

820823
try {
821-
return execute(sealedRequest).get();
824+
return execute(sealedRequest).get(sealedRequest.getConfig().getSocketTimeout(), TimeUnit.MILLISECONDS);
822825
} catch (InterruptedException e) {
823826
Thread.currentThread().interrupt();
824827
throw ClickHouseException.forCancellation(e, sealedRequest.getServer());
825828
} catch (CancellationException e) {
826829
throw ClickHouseException.forCancellation(e, sealedRequest.getServer());
827-
} catch (CompletionException | ExecutionException | UncheckedIOException e) {
830+
} catch (CompletionException | ExecutionException | TimeoutException | UncheckedIOException e) {
828831
Throwable cause = e.getCause();
829832
if (cause == null) {
830833
cause = e;

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java

+80-8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.concurrent.ExecutorService;
1616
import java.util.concurrent.Executors;
1717
import java.util.concurrent.ScheduledExecutorService;
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.TimeoutException;
1820
import java.util.concurrent.atomic.AtomicReference;
1921

2022
import com.clickhouse.client.config.ClickHouseOption;
@@ -80,6 +82,10 @@ public boolean ping(ClickHouseNode server, int timeout) {
8082
static final class Agent implements ClickHouseClient {
8183
private static final Logger log = LoggerFactory.getLogger(Agent.class);
8284

85+
private static final long INITIAL_REPEAT_DELAY = 100;
86+
private static final long MAX_REPEAT_DELAY = 1000;
87+
private static final long REPEAT_DELAY_BACKOFF = 100;
88+
8389
private final AtomicReference<ClickHouseClient> client;
8490

8591
Agent(ClickHouseClient client, ClickHouseConfig config) {
@@ -110,6 +116,7 @@ ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, ClickHouseExcept
110116
ClickHouseNode current = sealedRequest.getServer();
111117
ClickHouseNodeManager manager = current.manager.get();
112118
if (manager == null) {
119+
log.debug("Cancel failover for unmanaged node: %s", current);
113120
break;
114121
}
115122
ClickHouseNode next = manager.suggestNode(current, exception);
@@ -118,8 +125,10 @@ ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, ClickHouseExcept
118125
break;
119126
}
120127
current.update(Status.FAULTY);
121-
next = sealedRequest.changeServer(current, next);
122-
if (next == current) {
128+
if (sealedRequest.isTransactional()) {
129+
log.debug("Cancel failover for transactional context: %s", sealedRequest.getTransaction());
130+
break;
131+
} else if ((next = sealedRequest.changeServer(current, next)) == current) {
123132
log.debug("Cancel failover for no alternative of %s", current);
124133
break;
125134
}
@@ -162,6 +171,59 @@ ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, ClickHouseExcept
162171
throw new CompletionException(exception);
163172
}
164173

174+
/**
175+
* Repeats sending same request until success, timed out or running into a
176+
* different error.
177+
*
178+
* @param sealedRequest non-null sealed request
179+
* @param exception non-null exception to start with
180+
* @param timeout timeout in milliseconds, zero or negative numbers means
181+
* no repeat
182+
* @return non-null response
183+
* @throws CompletionException when error occurred or timed out
184+
*/
185+
ClickHouseResponse repeat(ClickHouseRequest<?> sealedRequest, ClickHouseException exception, long timeout) {
186+
if (timeout > 0) {
187+
final int errorCode = exception.getErrorCode();
188+
final long startTime = System.currentTimeMillis();
189+
190+
long delay = INITIAL_REPEAT_DELAY;
191+
long elapsed = 0L;
192+
int count = 1;
193+
while (true) {
194+
log.info("Repeating #%d (delay=%d, elapsed=%d, timeout=%d) due to: %s", count++, delay, elapsed,
195+
timeout, exception.getMessage());
196+
try {
197+
return sendOnce(sealedRequest);
198+
} catch (Exception exp) {
199+
exception = ClickHouseException.of(exp.getCause() != null ? exp.getCause() : exp,
200+
sealedRequest.getServer());
201+
}
202+
203+
elapsed = System.currentTimeMillis() - startTime;
204+
if (exception.getErrorCode() != errorCode || elapsed + delay >= timeout) {
205+
log.warn("Stopped repeating(delay=%d, elapsed=%d, timeout=%d) for %s", delay, elapsed,
206+
timeout, exception.getMessage());
207+
break;
208+
}
209+
210+
try {
211+
Thread.sleep(delay);
212+
elapsed += delay;
213+
} catch (InterruptedException e) {
214+
Thread.currentThread().interrupt();
215+
break;
216+
}
217+
if (delay >= MAX_REPEAT_DELAY) {
218+
delay = MAX_REPEAT_DELAY;
219+
} else {
220+
delay += REPEAT_DELAY_BACKOFF;
221+
}
222+
}
223+
}
224+
throw new CompletionException(exception);
225+
}
226+
165227
ClickHouseResponse retry(ClickHouseRequest<?> sealedRequest, ClickHouseException exception, int times) {
166228
for (int i = 1; i <= times; i++) {
167229
log.debug("Retry %d of %d due to: %s", i, times, exception.getMessage());
@@ -186,18 +248,27 @@ ClickHouseResponse handle(ClickHouseRequest<?> sealedRequest, Throwable cause) {
186248
cause = ((UncheckedIOException) cause).getCause();
187249
}
188250

189-
log.debug("Handling %s(failover=%d, retry=%d)", cause, sealedRequest.getConfig().getFailover(),
190-
sealedRequest.getConfig().getRetry());
251+
ClickHouseConfig config = sealedRequest.getConfig();
252+
log.debug("Handling %s(failover=%d, retry=%d)", cause, config.getFailover(), config.getRetry());
253+
ClickHouseException ex = ClickHouseException.of(cause, sealedRequest.getServer());
191254
try {
255+
if (config.isRepeatOnSessionLock()
256+
&& ex.getErrorCode() == ClickHouseException.ERROR_SESSION_IS_LOCKED) {
257+
// connection timeout is usually a small number(defaults to 5000 ms), making it
258+
// better default compare to socket timeout and max execution time etc.
259+
return repeat(sealedRequest, ex, config.getSessionTimeout() <= 0 ? config.getConnectionTimeout()
260+
: TimeUnit.SECONDS.toMillis(config.getSessionTimeout()));
261+
}
262+
192263
int times = sealedRequest.getConfig().getFailover();
193264
if (times > 0) {
194-
return failover(sealedRequest, ClickHouseException.of(cause, sealedRequest.getServer()), times);
265+
return failover(sealedRequest, ex, times);
195266
}
196267

197268
// different from failover: 1) retry on the same node; 2) never retry on timeout
198269
times = sealedRequest.getConfig().getRetry();
199270
if (times > 0) {
200-
return retry(sealedRequest, ClickHouseException.of(cause, sealedRequest.getServer()), times);
271+
return retry(sealedRequest, ex, times);
201272
}
202273

203274
throw new CompletionException(cause);
@@ -210,11 +281,12 @@ ClickHouseResponse handle(ClickHouseRequest<?> sealedRequest, Throwable cause) {
210281

211282
ClickHouseResponse sendOnce(ClickHouseRequest<?> sealedRequest) {
212283
try {
213-
return getClient().execute(sealedRequest).get();
284+
return getClient().execute(sealedRequest).get(sealedRequest.getConfig().getSocketTimeout(),
285+
TimeUnit.MILLISECONDS);
214286
} catch (InterruptedException e) {
215287
Thread.currentThread().interrupt();
216288
throw new CancellationException("Execution was interrupted");
217-
} catch (ExecutionException e) {
289+
} catch (ExecutionException | TimeoutException e) {
218290
throw new CompletionException(e.getCause());
219291
}
220292
}

0 commit comments

Comments
 (0)