Skip to content

Experimental transaction support #1008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ jobs:
strategy:
matrix:
# most recent LTS releases as well as latest stable builds
clickhouse: ["21.3", "21.8", "latest"]
clickhouse: ["21.8", "22.3", "latest"]
# http2 here represents http protocol + JDK HttpClient(http_connection_provider=HTTP_CLIENT)
protocol: ["http", "http2", "grpc"]
exclude:
- clickhouse: "21.3"
protocol: grpc
- clickhouse: "21.8"
protocol: grpc
fail-fast: false
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ Note: in general, the new driver(v0.3.2) is a few times faster with less memory
| | UUID | :white_check_mark: | |
| High Availability | Load Balancing | :white_check_mark: | supported since 0.3.2-patch10 |
| | Failover | :white_check_mark: | supported since 0.3.2-patch10 |
| Transaction | Transaction | :white_check_mark: | supported since 0.3.2-patch11, use ClickHouse 22.7+ for native implicit transaction support |
| | Savepoint | :x: | |
| | XAConnection | :x: | |

## Examples

Expand Down Expand Up @@ -178,7 +181,7 @@ It's time consuming to run all benchmarks against all drivers using different pa

## Testing

By default, docker container will be created automatically during integration test. You can pass system property like `-DclickhouseVersion=21.8` to specify version of ClickHouse.
By default, docker container will be created automatically during integration test. You can pass system property like `-DclickhouseVersion=22.3` to specify version of ClickHouse.

In the case you prefer to test against an existing server, please follow instructions below:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.clickhouse.client.ClickHouseChecker;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseCompression;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseFile;
Expand Down Expand Up @@ -136,8 +137,9 @@ static void dockerCommand(ClickHouseConfig config, String hostDir, String contai
commands.add(DEFAULT_CLICKHOUSE_CLI_PATH);
}

static Process startProcess(ClickHouseNode server, ClickHouseRequest<?> request) {
static Process startProcess(ClickHouseRequest<?> request) {
final ClickHouseConfig config = request.getConfig();
final ClickHouseNode server = request.getServer();
final int timeout = config.getSocketTimeout();

String hostDir = (String) config.getOption(ClickHouseCommandLineOption.CLI_WORK_DIRECTORY);
Expand Down Expand Up @@ -196,7 +198,7 @@ static Process startProcess(ClickHouseNode server, ClickHouseRequest<?> request)
if (!ClickHouseChecker.isNullOrBlank(str)) {
commands.add("--query_id=".concat(str));
}
commands.add("--query=".concat(request.getStatements(false).get(0)));
commands.add("--query=".concat(str = request.getStatements(false).get(0)));

for (ClickHouseExternalTable table : request.getExternalTables()) {
ClickHouseFile tableFile = table.getFile();
Expand Down Expand Up @@ -331,30 +333,36 @@ static Process startProcess(ClickHouseNode server, ClickHouseRequest<?> request)
}
}

private final ClickHouseNode server;
private final ClickHouseRequest<?> request;

private final Process process;

private String error;

public ClickHouseCommandLine(ClickHouseNode server, ClickHouseRequest<?> request) {
this.server = server;
public ClickHouseCommandLine(ClickHouseRequest<?> request) {
this.request = request;

this.process = startProcess(server, request);
this.process = startProcess(request);
this.error = null;
}

public ClickHouseInputStream getInputStream() throws IOException {
ClickHouseOutputStream out = request.getOutputStream().orElse(null);
Runnable postCloseAction = () -> {
IOException exp = getError();
if (exp != null) {
throw new UncheckedIOException(exp);
}
};
if (out != null && !out.getUnderlyingFile().isAvailable()) {
try (OutputStream o = out) {
ClickHouseInputStream.pipe(process.getInputStream(), o, request.getConfig().getWriteBufferSize());
}
return ClickHouseInputStream.empty();
return ClickHouseInputStream.wrap(null, ClickHouseInputStream.empty(),
request.getConfig().getReadBufferSize(), postCloseAction, ClickHouseCompression.NONE, 0);
} else {
return ClickHouseInputStream.of(process.getInputStream(), request.getConfig().getReadBufferSize());
return ClickHouseInputStream.of(process.getInputStream(), request.getConfig().getReadBufferSize(),
postCloseAction);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected ClickHouseCommandLine newConnection(ClickHouseCommandLine conn, ClickH
closeConnection(conn, false);
}

return new ClickHouseCommandLine(server, request);
return new ClickHouseCommandLine(request);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ public class ClickHouseCommandLineResponse extends ClickHouseStreamResponse {

protected ClickHouseCommandLineResponse(ClickHouseConfig config, ClickHouseCommandLine cli) throws IOException {
super(config, cli.getInputStream(), null, null, ClickHouseResponseSummary.EMPTY);
this.cli = cli;

if (this.input.available() < 1) {
IOException exp = cli.getError();
if (exp != null) {
throw exp;
}
}

this.cli = cli;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@ public void testCustomLoad() throws Exception {
throw new SkipException("Skip due to time out error");
}

@Test(groups = { "integration" })
@Override
public void testErrorDuringQuery() throws Exception {
throw new SkipException(
"Skip due to incomplete implementation(needs to consider ErrorOutputStream in deserialization as well)");
}

@Test(groups = { "integration" })
@Override
public void testLoadRawData() throws Exception {
Expand Down Expand Up @@ -92,6 +85,12 @@ public void testReadWriteGeoTypes() {
throw new SkipException("Skip due to session is not supported");
}

@Test(groups = { "integration" })
@Override
public void testSessionLock() {
throw new SkipException("Skip due to session is not supported");
}

@Test(groups = { "integration" })
@Override
public void testTempTable() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.clickhouse.client;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
Expand All @@ -16,6 +17,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import com.clickhouse.client.config.ClickHouseBufferingMode;
Expand Down Expand Up @@ -345,7 +347,7 @@ static CompletableFuture<ClickHouseResponseSummary> dump(ClickHouseNode server,
} finally {
try {
output.close();
} catch (Exception e) {
} catch (IOException e) {
// ignore
}
}
Expand Down Expand Up @@ -438,21 +440,22 @@ static CompletableFuture<ClickHouseResponseSummary> load(ClickHouseNode server,
stream.close();
}
// wait until write & read acomplished
try (ClickHouseResponse response = future.get()) {
try (ClickHouseResponse response = future.get(client.getConfig().getSocketTimeout(),
TimeUnit.MILLISECONDS)) {
return response.getSummary();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw ClickHouseException.forCancellation(e, theServer);
} catch (CancellationException e) {
throw ClickHouseException.forCancellation(e, theServer);
} catch (ExecutionException e) {
} catch (ExecutionException | TimeoutException e) {
throw ClickHouseException.of(e, theServer);
} finally {
if (input != null) {
try {
input.close();
} catch (Exception e) {
} catch (IOException e) {
// ignore
}
}
Expand Down Expand Up @@ -491,7 +494,7 @@ static CompletableFuture<ClickHouseResponseSummary> load(ClickHouseNode server,
} finally {
try {
input.close();
} catch (Exception e) {
} catch (IOException e) {
// ignore
}
}
Expand Down Expand Up @@ -546,7 +549,7 @@ static CompletableFuture<List<ClickHouseResponseSummary>> send(ClickHouseNode se
.option(ClickHouseClientOption.ASYNC, false).build()) {
ClickHouseRequest<?> request = client.connect(theServer).format(ClickHouseFormat.RowBinary);
if ((boolean) ClickHouseDefaults.AUTO_SESSION.getEffectiveDefaultValue() && queries.size() > 1) {
request.session(UUID.randomUUID().toString(), false);
request.session(request.getManager().createSessionId(), false);
}
for (String query : queries) {
try (ClickHouseResponse resp = request.query(query).executeAndWait()) {
Expand Down Expand Up @@ -818,13 +821,13 @@ default ClickHouseResponse executeAndWait(ClickHouseRequest<?> request) throws C
final ClickHouseRequest<?> sealedRequest = request.seal();

try {
return execute(sealedRequest).get();
return execute(sealedRequest).get(sealedRequest.getConfig().getSocketTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw ClickHouseException.forCancellation(e, sealedRequest.getServer());
} catch (CancellationException e) {
throw ClickHouseException.forCancellation(e, sealedRequest.getServer());
} catch (CompletionException | ExecutionException | UncheckedIOException e) {
} catch (CompletionException | ExecutionException | TimeoutException | UncheckedIOException e) {
Throwable cause = e.getCause();
if (cause == null) {
cause = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import com.clickhouse.client.config.ClickHouseOption;
Expand Down Expand Up @@ -80,6 +82,10 @@ public boolean ping(ClickHouseNode server, int timeout) {
static final class Agent implements ClickHouseClient {
private static final Logger log = LoggerFactory.getLogger(Agent.class);

private static final long INITIAL_REPEAT_DELAY = 100;
private static final long MAX_REPEAT_DELAY = 1000;
private static final long REPEAT_DELAY_BACKOFF = 100;

private final AtomicReference<ClickHouseClient> client;

Agent(ClickHouseClient client, ClickHouseConfig config) {
Expand Down Expand Up @@ -110,6 +116,7 @@ ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, ClickHouseExcept
ClickHouseNode current = sealedRequest.getServer();
ClickHouseNodeManager manager = current.manager.get();
if (manager == null) {
log.debug("Cancel failover for unmanaged node: %s", current);
break;
}
ClickHouseNode next = manager.suggestNode(current, exception);
Expand All @@ -118,8 +125,10 @@ ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, ClickHouseExcept
break;
}
current.update(Status.FAULTY);
next = sealedRequest.changeServer(current, next);
if (next == current) {
if (sealedRequest.isTransactional()) {
log.debug("Cancel failover for transactional context: %s", sealedRequest.getTransaction());
break;
} else if ((next = sealedRequest.changeServer(current, next)) == current) {
log.debug("Cancel failover for no alternative of %s", current);
break;
}
Expand Down Expand Up @@ -162,6 +171,59 @@ ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, ClickHouseExcept
throw new CompletionException(exception);
}

/**
* Repeats sending same request until success, timed out or running into a
* different error.
*
* @param sealedRequest non-null sealed request
* @param exception non-null exception to start with
* @param timeout timeout in milliseconds, zero or negative numbers means
* no repeat
* @return non-null response
* @throws CompletionException when error occurred or timed out
*/
ClickHouseResponse repeat(ClickHouseRequest<?> sealedRequest, ClickHouseException exception, long timeout) {
if (timeout > 0) {
final int errorCode = exception.getErrorCode();
final long startTime = System.currentTimeMillis();

long delay = INITIAL_REPEAT_DELAY;
long elapsed = 0L;
int count = 1;
while (true) {
log.info("Repeating #%d (delay=%d, elapsed=%d, timeout=%d) due to: %s", count++, delay, elapsed,
timeout, exception.getMessage());
try {
return sendOnce(sealedRequest);
} catch (Exception exp) {
exception = ClickHouseException.of(exp.getCause() != null ? exp.getCause() : exp,
sealedRequest.getServer());
}

elapsed = System.currentTimeMillis() - startTime;
if (exception.getErrorCode() != errorCode || elapsed + delay >= timeout) {
log.warn("Stopped repeating(delay=%d, elapsed=%d, timeout=%d) for %s", delay, elapsed,
timeout, exception.getMessage());
break;
}

try {
Thread.sleep(delay);
elapsed += delay;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
if (delay >= MAX_REPEAT_DELAY) {
delay = MAX_REPEAT_DELAY;
} else {
delay += REPEAT_DELAY_BACKOFF;
}
}
}
throw new CompletionException(exception);
}

ClickHouseResponse retry(ClickHouseRequest<?> sealedRequest, ClickHouseException exception, int times) {
for (int i = 1; i <= times; i++) {
log.debug("Retry %d of %d due to: %s", i, times, exception.getMessage());
Expand All @@ -186,18 +248,27 @@ ClickHouseResponse handle(ClickHouseRequest<?> sealedRequest, Throwable cause) {
cause = ((UncheckedIOException) cause).getCause();
}

log.debug("Handling %s(failover=%d, retry=%d)", cause, sealedRequest.getConfig().getFailover(),
sealedRequest.getConfig().getRetry());
ClickHouseConfig config = sealedRequest.getConfig();
log.debug("Handling %s(failover=%d, retry=%d)", cause, config.getFailover(), config.getRetry());
ClickHouseException ex = ClickHouseException.of(cause, sealedRequest.getServer());
try {
if (config.isRepeatOnSessionLock()
&& ex.getErrorCode() == ClickHouseException.ERROR_SESSION_IS_LOCKED) {
// connection timeout is usually a small number(defaults to 5000 ms), making it
// better default compare to socket timeout and max execution time etc.
return repeat(sealedRequest, ex, config.getSessionTimeout() <= 0 ? config.getConnectionTimeout()
: TimeUnit.SECONDS.toMillis(config.getSessionTimeout()));
}

int times = sealedRequest.getConfig().getFailover();
if (times > 0) {
return failover(sealedRequest, ClickHouseException.of(cause, sealedRequest.getServer()), times);
return failover(sealedRequest, ex, times);
}

// different from failover: 1) retry on the same node; 2) never retry on timeout
times = sealedRequest.getConfig().getRetry();
if (times > 0) {
return retry(sealedRequest, ClickHouseException.of(cause, sealedRequest.getServer()), times);
return retry(sealedRequest, ex, times);
}

throw new CompletionException(cause);
Expand All @@ -210,11 +281,12 @@ ClickHouseResponse handle(ClickHouseRequest<?> sealedRequest, Throwable cause) {

ClickHouseResponse sendOnce(ClickHouseRequest<?> sealedRequest) {
try {
return getClient().execute(sealedRequest).get();
return getClient().execute(sealedRequest).get(sealedRequest.getConfig().getSocketTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CancellationException("Execution was interrupted");
} catch (ExecutionException e) {
} catch (ExecutionException | TimeoutException e) {
throw new CompletionException(e.getCause());
}
}
Expand Down
Loading