diff --git a/README.md b/README.md index b83d56312..315689e7a 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Note: in general, the new driver(v0.3.2+) is a few times faster with less memory | API | [JDBC](https://docs.oracle.com/javase/8/docs/technotes/guides/jdbc/) | :white_check_mark: | | | | [R2DBC](https://r2dbc.io/) | :x: | will be supported in 0.3.3 | | | [GraphQL](https://graphql.org/) | :x: | | -| Protocol | [HTTP](https://clickhouse.com/docs/en/interfaces/http/) | :white_check_mark: | recommended, defaults to `java.net.HttpURLConnection` and can be changed to `java.net.http.HttpClient`(less stable) | +| Protocol | [HTTP](https://clickhouse.com/docs/en/interfaces/http/) | :white_check_mark: | recommended, defaults to `java.net.HttpURLConnection` and can be changed to `apache http client` and java.net.http.HttpClient`(less stable). Note that `apache http client` support socket options.| | | [gRPC](https://clickhouse.com/docs/en/interfaces/grpc/) | :white_check_mark: | still experimental, works with 22.3+, known to has [issue](https://github.com/ClickHouse/ClickHouse/issues/28671#issuecomment-1087049993) when using LZ4 compression | | | [TCP/Native](https://clickhouse.com/docs/en/interfaces/tcp/) | :white_check_mark: | `clickhouse-cli-client`(wrapper of ClickHouse native command-line client) was added in 0.3.2-patch10, `clickhouse-tcp-client` will be available in 0.3.3 | | | [Local/File](https://clickhouse.com/docs/en/operations/utilities/clickhouse-local/) | :x: | `clickhouse-cli-client` will be enhanced to support `clickhouse-local` | diff --git a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/JdbcDriver.java b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/JdbcDriver.java index f04ad56a4..be078b58e 100644 --- a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/JdbcDriver.java +++ b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/jdbc/JdbcDriver.java @@ -17,6 +17,9 @@ public enum JdbcDriver { ClickhouseHttpClientJdbc("com.clickhouse.jdbc.ClickHouseDriver", "jdbc:ch://%s:%s/%s?http_connection_provider=HTTP_CLIENT&ssl=false&user=%s&password=%s&use_server_time_zone=false&use_time_zone=UTC&compress=%s%s", Constants.HTTP_PORT), + ClickhouseApacheHttpClientJdbc("com.clickhouse.jdbc.ClickHouseDriver", + "jdbc:ch://%s:%s/%s?http_connection_provider=APACHE_HTTP_CLIENT&ssl=false&user=%s&password=%s&use_server_time_zone=false&use_time_zone=UTC&compress=%s%s", + Constants.HTTP_PORT), // default gRPC implementation ClickhouseGrpcJdbc("com.clickhouse.jdbc.ClickHouseDriver", "jdbc:ch:grpc://%s:%s/%s?ssl=false&user=%s&password=%s&use_server_time_zone=false&use_time_zone=UTC&max_inbound_message_size=2147483647&compress=%s%s", diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java index 9a642de01..e63a19110 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java @@ -323,7 +323,13 @@ public enum ClickHouseClientOption implements ClickHouseOption { * false. */ USE_TIME_ZONE("use_time_zone", "", "Time zone of all DateTime* values. " - + "Only used when use_server_time_zone is false. Empty value means client time zone."); + + "Only used when use_server_time_zone is false. Empty value means client time zone."), + + /** + * Socket IP_TOS option which indicates IP package priority. + */ + IP_TOS("socket_op_ip_tos", 0, "Socket IP_TOS option which indicates IP package priority."); + private final String key; private final Serializable defaultValue; diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index f16c7432c..b884b64e5 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -1,5 +1,33 @@ package com.clickhouse.client; +import com.clickhouse.client.ClickHouseClientBuilder.Agent; +import com.clickhouse.client.ClickHouseTransaction.XID; +import com.clickhouse.client.config.ClickHouseBufferingMode; +import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.client.config.ClickHouseRenameMethod; +import com.clickhouse.client.config.ClickHouseSslMode; +import com.clickhouse.client.data.BinaryStreamUtils; +import com.clickhouse.client.data.ClickHouseBigDecimalValue; +import com.clickhouse.client.data.ClickHouseBigIntegerValue; +import com.clickhouse.client.data.ClickHouseByteValue; +import com.clickhouse.client.data.ClickHouseDateTimeValue; +import com.clickhouse.client.data.ClickHouseEnumValue; +import com.clickhouse.client.data.ClickHouseExternalTable; +import com.clickhouse.client.data.ClickHouseIntegerValue; +import com.clickhouse.client.data.ClickHouseIpv4Value; +import com.clickhouse.client.data.ClickHouseIpv6Value; +import com.clickhouse.client.data.ClickHouseLongValue; +import com.clickhouse.client.data.ClickHouseOffsetDateTimeValue; +import com.clickhouse.client.data.ClickHouseStringValue; +import com.clickhouse.client.data.UnsignedByte; +import com.clickhouse.client.data.UnsignedInteger; +import com.clickhouse.client.data.UnsignedLong; +import com.clickhouse.client.data.UnsignedShort; +import org.testng.Assert; +import org.testng.SkipException; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -33,35 +61,6 @@ import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import com.clickhouse.client.ClickHouseClientBuilder.Agent; -import com.clickhouse.client.ClickHouseTransaction.XID; -import com.clickhouse.client.config.ClickHouseBufferingMode; -import com.clickhouse.client.config.ClickHouseClientOption; -import com.clickhouse.client.config.ClickHouseRenameMethod; -import com.clickhouse.client.config.ClickHouseSslMode; -import com.clickhouse.client.data.BinaryStreamUtils; -import com.clickhouse.client.data.ClickHouseBigDecimalValue; -import com.clickhouse.client.data.ClickHouseBigIntegerValue; -import com.clickhouse.client.data.ClickHouseByteValue; -import com.clickhouse.client.data.ClickHouseDateTimeValue; -import com.clickhouse.client.data.ClickHouseEnumValue; -import com.clickhouse.client.data.ClickHouseExternalTable; -import com.clickhouse.client.data.ClickHouseIntegerValue; -import com.clickhouse.client.data.ClickHouseIpv4Value; -import com.clickhouse.client.data.ClickHouseIpv6Value; -import com.clickhouse.client.data.ClickHouseLongValue; -import com.clickhouse.client.data.ClickHouseOffsetDateTimeValue; -import com.clickhouse.client.data.ClickHouseStringValue; -import com.clickhouse.client.data.UnsignedByte; -import com.clickhouse.client.data.UnsignedInteger; -import com.clickhouse.client.data.UnsignedLong; -import com.clickhouse.client.data.UnsignedShort; - -import org.testng.Assert; -import org.testng.SkipException; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - public abstract class ClientIntegrationTest extends BaseIntegrationTest { protected void checkRowCount(String queryOrTableName, int expectedRowCount) throws ClickHouseException { try (ClickHouseClient client = getClient()) { diff --git a/clickhouse-http-client/pom.xml b/clickhouse-http-client/pom.xml index 426c8cc61..20758bf88 100644 --- a/clickhouse-http-client/pom.xml +++ b/clickhouse-http-client/pom.xml @@ -29,6 +29,10 @@ org.lz4 lz4-java + + org.apache.httpcomponents.client5 + httpclient5 + diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java new file mode 100644 index 000000000..7cfd62344 --- /dev/null +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java @@ -0,0 +1,427 @@ +package com.clickhouse.client.http; + +import com.clickhouse.client.ClickHouseChecker; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseConfig; +import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.ClickHouseInputStream; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseSslContextProvider; +import com.clickhouse.client.ClickHouseUtils; +import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.client.config.ClickHouseSslMode; +import com.clickhouse.client.data.ClickHouseExternalTable; +import com.clickhouse.client.http.config.ClickHouseHttpOption; +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.classic.methods.HttpPost; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; +import org.apache.hc.client5.http.socket.ConnectionSocketFactory; +import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory; +import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.ProtocolException; +import org.apache.hc.core5.http.config.Registry; +import org.apache.hc.core5.http.config.RegistryBuilder; +import org.apache.hc.core5.http.io.SocketConfig; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.ssl.SSLContexts; +import org.apache.hc.core5.util.Timeout; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.ConnectException; +import java.net.Socket; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TimeZone; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLException; + +/** + * Created by wujianchao on 2022/12/1. + */ +public class ApacheHttpConnectionImpl extends ClickHouseHttpConnection { + + private static final Logger log = LoggerFactory.getLogger(ApacheHttpConnectionImpl.class); + + private static final byte[] HEADER_CONTENT_DISPOSITION = "content-disposition: form-data; name=\"" + .getBytes(StandardCharsets.US_ASCII); + private static final byte[] HEADER_OCTET_STREAM = "content-type: application/octet-stream\r\n" + .getBytes(StandardCharsets.US_ASCII); + private static final byte[] HEADER_BINARY_ENCODING = "content-transfer-encoding: binary\r\n\r\n" + .getBytes(StandardCharsets.US_ASCII); + + private static final byte[] DOUBLE_DASH = new byte[] {'-', '-'}; + private static final byte[] END_OF_NAME = new byte[] {'"', '\r', '\n'}; + private static final byte[] LINE_PREFIX = new byte[] {'\r', '\n', '-', '-'}; + private static final byte[] LINE_SUFFIX = new byte[] {'\r', '\n'}; + + private static final byte[] SUFFIX_QUERY = "query\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); + private static final byte[] SUFFIX_FORMAT = "_format\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); + private static final byte[] SUFFIX_STRUCTURE = "_structure\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); + private static final byte[] SUFFIX_FILENAME = "\"; filename=\"".getBytes(StandardCharsets.US_ASCII); + + private final CloseableHttpClient client; + private final AtomicBoolean isBusy = new AtomicBoolean(false); + + protected ApacheHttpConnectionImpl(ClickHouseNode server, ClickHouseRequest request, ExecutorService executor) + throws IOException, URISyntaxException { + super(server, request); + client = newConnection(); + } + + private CloseableHttpClient newConnection() throws IOException { + RegistryBuilder r = RegistryBuilder.create() + .register("http", SocketFactory.create(config)); + if (config.isSsl()) { + r.register("https", SSLSocketFactory.create(config)); + } + return HttpClientBuilder.create() + .setConnectionManager(new HttpConnectionManager(r.build(), config)).build(); + } + + private ClickHouseHttpResponse buildResponse(CloseableHttpResponse response, ClickHouseConfig config, Runnable postCloseAction) + throws IOException { + // X-ClickHouse-Server-Display-Name: xxx + // X-ClickHouse-Query-Id: xxx + // X-ClickHouse-Format: RowBinaryWithNamesAndTypes + // X-ClickHouse-Timezone: UTC + // X-ClickHouse-Summary: + // {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"} + String displayName = getResponseHeader(response, "X-ClickHouse-Server-Display-Name", server.getHost()); + String queryId = getResponseHeader(response, "X-ClickHouse-Query-Id", ""); + String summary = getResponseHeader(response, "X-ClickHouse-Summary", "{}"); + + ClickHouseConfig c = config; + ClickHouseFormat format = c.getFormat(); + TimeZone timeZone = c.getServerTimeZone(); + boolean hasOutputFile = output != null && output.getUnderlyingFile().isAvailable(); + boolean hasQueryResult = false; + // queryId, format and timeZone are only available for queries + if (!ClickHouseChecker.isNullOrEmpty(queryId)) { + String value = getResponseHeader(response, "X-ClickHouse-Format", ""); + if (!ClickHouseChecker.isNullOrEmpty(value)) { + format = ClickHouseFormat.valueOf(value); + hasQueryResult = true; + } + value = getResponseHeader(response, "X-ClickHouse-Timezone", ""); + timeZone = !ClickHouseChecker.isNullOrEmpty(value) ? TimeZone.getTimeZone(value) + : timeZone; + } + + final InputStream source; + final Runnable action; + if (output != null) { + source = ClickHouseInputStream.empty(); + action = () -> { + try (OutputStream o = output) { + ClickHouseInputStream.pipe(response.getEntity().getContent(), o, c.getWriteBufferSize()); + if (postCloseAction != null) { + postCloseAction.run(); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to redirect response to given output stream", e); + } + }; + } else { + source = response.getEntity().getContent(); + action = postCloseAction; + } + return new ClickHouseHttpResponse(this, + hasOutputFile ? ClickHouseInputStream.of(source, c.getReadBufferSize(), action) + : (hasQueryResult ? ClickHouseClient.getAsyncResponseInputStream(c, source, action) + : ClickHouseClient.getResponseInputStream(c, source, action)), + displayName, queryId, summary, format, timeZone); + } + + private String getResponseHeader(CloseableHttpResponse response, String header, String defaultValue) { + Header h = response.getFirstHeader(header); + return h == null ? defaultValue : h.getValue(); + } + + private void setHeaders(HttpRequest request, Map headers) { + headers = mergeHeaders(headers); + + if (headers != null && !headers.isEmpty()) { + for (Map.Entry header : headers.entrySet()) { + request.setHeader(header.getKey(), header.getValue()); + } + } + } + + private void checkResponse(CloseableHttpResponse response) throws IOException { + if (response.getEntity() == null) { + throw new ConnectException( + ClickHouseUtils.format("HTTP response %d, %s", response.getCode(), response.getReasonPhrase())); + } + + if (response.getCode() == 200) { + return; + } + + Header errorCode = response.getFirstHeader("X-ClickHouse-Exception-Code"); + Header serverName = response.getFirstHeader("X-ClickHouse-Server-Display-Name"); + + String errorMsg; + + int bufferSize = (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(); + ByteArrayOutputStream output = new ByteArrayOutputStream(bufferSize); + ClickHouseInputStream.pipe(response.getEntity().getContent(), output, bufferSize); + byte[] bytes = output.toByteArray(); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader( + ClickHouseClient.getResponseInputStream(config, new ByteArrayInputStream(bytes), null), + StandardCharsets.UTF_8))) { + StringBuilder builder = new StringBuilder(); + while ((errorMsg = reader.readLine()) != null) { + builder.append(errorMsg).append('\n'); + } + errorMsg = builder.toString(); + + } catch (IOException e) { + log.debug("Failed to read error message[code=%s] from server [%s] due to: %s", + errorCode.getValue(), + serverName.getValue(), + e.getMessage()); + throw e; + } + throw new IOException(errorMsg); + } + + @Override + protected boolean isReusable() { + return true; + } + + protected ClickHouseHttpResponse post(String sql, ClickHouseInputStream data, List tables, + String url, Map headers, ClickHouseConfig config, + Runnable postCloseAction) + throws IOException { + // Connection is reusable, ensure that only one request is on fly. + if (!isBusy.compareAndSet(false, true)) + throw new IOException("Connection is busy"); + + HttpPost post = new HttpPost(url == null ? this.url : url); + setHeaders(post, headers); + byte[] boundary = null; + String contentType = "text/plain; charset=UTF-8"; + + if (tables != null && !tables.isEmpty()) { + String uuid = rm.createUniqueId(); + contentType = "multipart/form-data; boundary=".concat(uuid); + boundary = uuid.getBytes(StandardCharsets.US_ASCII); + } + + post.setHeader("Content-Type", contentType); + + final boolean hasFile = data != null && data.getUnderlyingFile().isAvailable(); + final boolean hasInput = data != null || boundary != null; + + Charset ascii = StandardCharsets.US_ASCII; + Charset utf8 = StandardCharsets.UTF_8; + + List inputParts = new ArrayList<>(); + + byte[] sqlBytes = hasFile ? new byte[0] : sql.getBytes(utf8); + if (boundary != null) { + // head + List head = new ArrayList<>(); + head.add(LINE_PREFIX); + head.add(boundary); + head.add(LINE_SUFFIX); + head.add(HEADER_CONTENT_DISPOSITION); + head.add(SUFFIX_QUERY); + head.add(sqlBytes); + + inputParts.add(new ByteArraysInputStream(head)); + + for (ClickHouseExternalTable t : tables) { + byte[] tableName = t.getName().getBytes(utf8); + // table head + List tableHead = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + tableHead.add(LINE_PREFIX); + tableHead.add(boundary); + tableHead.add(LINE_SUFFIX); + tableHead.add(HEADER_CONTENT_DISPOSITION); + tableHead.add(tableName); + if (i == 0) { + tableHead.add(SUFFIX_FORMAT); + tableHead.add(t.getFormat().name().getBytes(ascii)); + } else if (i == 1) { + tableHead.add(SUFFIX_STRUCTURE); + tableHead.add(t.getStructure().getBytes(utf8)); + } else { + tableHead.add(SUFFIX_FILENAME); + tableHead.add(tableName); + tableHead.add(END_OF_NAME); + break; + } + } + tableHead.add(HEADER_OCTET_STREAM); + tableHead.add(HEADER_BINARY_ENCODING); + inputParts.add(new ByteArraysInputStream(tableHead)); + + // table content + inputParts.add(t.getContent()); + } + // tail + List tail = new ArrayList<>(); + tail.add(LINE_PREFIX); + tail.add(boundary); + tail.add(DOUBLE_DASH); + tail.add(LINE_SUFFIX); + inputParts.add(new ByteArraysInputStream(tail)); + + } else { + List content = new ArrayList<>(); + content.add(sqlBytes); + if (data != null && data.available() > 0) { + // append \n + if (sqlBytes.length > 0 && sqlBytes[sqlBytes.length - 1] != (byte) '\n') { + content.add(new byte[] {'\n'}); + } + inputParts.add(new ByteArraysInputStream(content)); + inputParts.add(data); + } else { + inputParts.add(new ByteArraysInputStream(content)); + } + } + + ClickHouseInputStream input = ClickHouseInputStream.of(inputParts, InputStream.class, null, null); + + String contentEncoding = headers == null ? null : headers.getOrDefault("content-encoding", null); + ClickHouseHttpEntity postBody = + new ClickHouseHttpEntity(input, config, contentType, contentEncoding, hasFile, hasInput); + + post.setEntity(postBody); + CloseableHttpResponse response = client.execute(post); + + checkResponse(response); + // buildResponse should use the config of current request in case of reusable connection. + return buildResponse(response, config, () -> { + isBusy.compareAndSet(true, false); + if (postCloseAction != null) { + postCloseAction.run(); + } + }); + + } + + @Override + public boolean ping(int timeout) { + String url = getBaseUrl().concat("ping"); + HttpGet ping = new HttpGet(url); + + try (CloseableHttpClient httpClient = newConnection(); + CloseableHttpResponse response = httpClient.execute(ping)) { + // TODO set timeout + checkResponse(response); + String ok = config.getStrOption(ClickHouseHttpOption.DEFAULT_RESPONSE); + return ok.equals(EntityUtils.toString(response.getEntity())); + + } catch (Exception e) { + log.debug("Failed to ping url %s due to: %s", url, e.getMessage()); + } + + return false; + } + + @Override + public void close() throws IOException { + client.close(); + } + + static class SocketFactory extends PlainConnectionSocketFactory { + private final ClickHouseConfig config; + + private SocketFactory(ClickHouseConfig config) { + this.config = config; + } + + @Override + public Socket createSocket(final HttpContext context) throws IOException { + Socket sock = new Socket(); + sock.setTrafficClass(config.getOption(ClickHouseClientOption.IP_TOS, Integer.class)); + // TODO 1.more socket options + // TODO 2.If connection is in reusable mode and + // use different socket options in different requests, + // new options will not work + return sock; + } + + public static SocketFactory create(ClickHouseConfig config) { + return new SocketFactory(config); + } + } + + static class SSLSocketFactory extends SSLConnectionSocketFactory { + private final ClickHouseConfig config; + + private SSLSocketFactory(ClickHouseConfig config) throws SSLException { + super(Objects.requireNonNull( + ClickHouseSslContextProvider.getProvider().getSslContext(SSLContext.class, config) + .orElse(SSLContexts.createDefault())), + config.getSslMode() == ClickHouseSslMode.STRICT + ? HttpsURLConnection.getDefaultHostnameVerifier() + : (hostname, session) -> true + ); + this.config = config; + } + + @Override + public Socket createSocket(HttpContext context) throws IOException { + Socket sock = new Socket(); + sock.setTrafficClass(config.getOption(ClickHouseClientOption.IP_TOS, Integer.class)); + // TODO more socket options + return sock; + } + + public static SSLSocketFactory create(ClickHouseConfig config) throws SSLException { + return new SSLSocketFactory(config); + } + } + + static class HttpConnectionManager extends BasicHttpClientConnectionManager { + public HttpConnectionManager(Registry socketFactory, ClickHouseConfig config) + throws SSLException { + super(socketFactory); + + ConnectionConfig connConfig = ConnectionConfig.custom() + .setConnectTimeout(Timeout.of(config.getConnectionTimeout(), TimeUnit.MILLISECONDS)) + .build(); + setConnectionConfig(connConfig); + SocketConfig socketConfig = SocketConfig.custom() + .setSoTimeout(Timeout.of(config.getSocketTimeout(), TimeUnit.MILLISECONDS)) + .setRcvBufSize(config.getReadBufferSize()) + .setSndBufSize(config.getWriteBufferSize()) + .build(); + setSocketConfig(socketConfig); + } + } + +} diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ByteArraysInputStream.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ByteArraysInputStream.java new file mode 100644 index 000000000..fa2c71fef --- /dev/null +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ByteArraysInputStream.java @@ -0,0 +1,212 @@ +package com.clickhouse.client.http; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Wraps some byte arrays into an input stream. + * It behaves in a zero copy way. + */ +public class ByteArraysInputStream extends InputStream { + + /** + * An array of bytes that was provided by the creator of the stream. + * Elements buf[0] through buf[count-1] are the only bytes that can ever be + * read from the stream; + */ + protected final List bufs; + + /** + * Current buf index + */ + protected int current; + /** + * position of current buf + */ + protected int currentPos; + + /** + * Position of bufs + */ + protected int pos; + + /** + * The currently marked position in the stream. + * The current buffer position is set to this point by the + * reset() method. + */ + protected int mark = 0; + + /** + * Total byte size in bufs. + */ + protected int count; + + public ByteArraysInputStream(List bufs) { + Objects.requireNonNull(bufs, "bufs"); + this.bufs = bufs; + this.pos = 0; + this.current = 0; + this.currentPos = 0; + int totalCount = 0; + for (byte[] buf : bufs) { + Objects.requireNonNull(buf, "element of bufs"); + totalCount += buf.length; + } + this.count = totalCount; + } + + public static ByteArraysInputStream of(List bufs) { + return new ByteArraysInputStream(bufs); + } + + public static ByteArraysInputStream of(byte[] ...bufs) { + return new ByteArraysInputStream(Arrays.asList(bufs)); + } + + /** + * Reads the next byte of data from this input stream. + */ + public synchronized int read() { + if (pos >= count) + return -1; + while (currentPos == bufs.get(current).length) { + current++; + currentPos = 0; + } + int ret = bufs.get(current)[currentPos] & 0xff; + + currentPos++; + pos++; + + return ret; + } + + /** + * Reads up to len bytes of data into an array of bytes + * from this input stream. + * + * @return the number bytes read or -1 if read nothing. + */ + public synchronized int read(byte b[], int off, int len) { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + + if (pos >= count) { + return -1; + } + + int avail = count - pos; + if (len > avail) { + len = avail; + } + if (len <= 0) { // TODO remove + return 0; + } + int remaining = len; + for (int i=current; i= remaining) { + System.arraycopy(buf, currentPos, b, off, remaining); + currentPos += remaining; + break; + } else { + System.arraycopy(buf, currentPos, b, off, buf.length - currentPos); + remaining -= (buf.length - currentPos); + off += (buf.length - currentPos); + current++; + currentPos = 0; + } + } + pos += len; + return len; + } + + /** + * Skips n bytes of input from this input stream. Fewer + * bytes might be skipped if the end of the input stream is reached. + * The actual number k + * of bytes to be skipped is equal to the smaller + * of n and count-pos. + * The value k is added into pos + * and k is returned. + * + * @return the actual number of bytes skipped. + */ + public synchronized long skip(long n) { + long k = count - pos; + if (n < k) { + k = n < 0 ? 0 : n; + } + + long remaining = k; + for (int i=current; i= remaining) { + currentPos += remaining; + break; + } else { + remaining -= (buf.length - currentPos); + current++; + currentPos = 0; + } + } + pos += k; + return k; + } + + /** + * Returns the number of remaining bytes that can be read (or skipped over) + * from this input stream. + *

+ * The value returned is count - pos, + * which is the number of bytes remaining to be read from the input buffer. + * + * @return the number of remaining bytes that can be read (or skipped + * over) from this input stream without blocking. + */ + public synchronized int available() { + return count - pos; + } + + public boolean markSupported() { + return true; + } + + /** + * Mark the pos and you can reset to it. + */ + public synchronized void mark(int readAheadLimit) { + mark = pos; + } + + /** + * Resets the buffer to the marked position. The marked position + * is 0 unless another position was marked or an offset was specified + * in the constructor. + */ + public synchronized void reset() { + int remaining = mark; + for (int i=0; i= remaining) { + currentPos = remaining; + current = i; + break; + } else { + remaining -= buf.length; + } + } + pos = mark; + } + + public void close() throws IOException { + // do nothing. + } +} diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnectionFactory.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnectionFactory.java index a3c18ec5b..57cae8722 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnectionFactory.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnectionFactory.java @@ -5,11 +5,24 @@ import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.http.config.ClickHouseHttpOption; +import com.clickhouse.client.http.config.HttpConnectionProvider; public final class ClickHouseHttpConnectionFactory { public static ClickHouseHttpConnection createConnection(ClickHouseNode server, ClickHouseRequest request, ExecutorService executor) throws IOException { - return new HttpUrlConnectionImpl(server, request, executor); + HttpConnectionProvider provider = request.getConfig().getOption(ClickHouseHttpOption.CONNECTION_PROVIDER, + HttpConnectionProvider.class); + + try { + return provider == null || provider == HttpConnectionProvider.HTTP_URL_CONNECTION + ? new HttpUrlConnectionImpl(server, request, executor) + : new ApacheHttpConnectionImpl(server, request, executor); + } catch (IOException e) { + throw e; + } catch (Throwable t) { + return new HttpUrlConnectionImpl(server, request, executor); + } } private ClickHouseHttpConnectionFactory() { diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpEntity.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpEntity.java new file mode 100644 index 000000000..1fd01860f --- /dev/null +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpEntity.java @@ -0,0 +1,90 @@ +package com.clickhouse.client.http; + +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseConfig; +import com.clickhouse.client.ClickHouseInputStream; +import com.clickhouse.client.ClickHouseOutputStream; +import org.apache.hc.core5.http.io.entity.AbstractHttpEntity; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Objects; + +/** + * Used to encapsulate post request. + */ +public class ClickHouseHttpEntity extends AbstractHttpEntity { + + /** + * Data to send + */ + private final ClickHouseInputStream in; + private final ClickHouseConfig config; + /** + * Indicate that there is extra data which comes from file. + */ + private final boolean hasFile; + /** + * Indicate that there is extra data which comes from external tables. + */ + private final boolean hasInput; + + public ClickHouseHttpEntity(ClickHouseInputStream in, ClickHouseConfig config, String contentType, + String contentEncoding, + boolean hasFile, boolean hasInput) { + super(contentType, contentEncoding, hasInput); + this.in = in; + this.config = config; + this.hasFile = hasFile; + this.hasInput = hasInput; + } + + @Override + public boolean isRepeatable() { + return false; + } + + @Override + public long getContentLength() { + return -1; + } + + @Override + public InputStream getContent() throws IOException, UnsupportedOperationException { + return in; + } + + @Override + public void writeTo(OutputStream outStream) throws IOException { + Objects.requireNonNull(outStream, "outStream"); + try { + OutputStream wrappedOut = hasFile + ? ClickHouseOutputStream.of(outStream, config.getWriteBufferSize()) + : (hasInput + ? ClickHouseClient.getAsyncRequestOutputStream(config, outStream, null) + : ClickHouseClient.getRequestOutputStream(config, outStream, null) + ); + final byte[] buffer = new byte[config.getBufferSize()]; + int readLen; + while ((readLen = in.read(buffer)) != -1) { + wrappedOut.write(buffer, 0, readLen); + } + wrappedOut.flush(); + } finally { + in.close(); + } + } + + @Override + public boolean isStreaming() { + return false; + } + + @Override + public void close() throws IOException { + if (in != null) { + in.close(); + } + } +} diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java index 4d91a0649..5818ec41d 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java @@ -1,10 +1,10 @@ package com.clickhouse.client.http.config; -import java.io.Serializable; - import com.clickhouse.client.ClickHouseChecker; import com.clickhouse.client.config.ClickHouseOption; +import java.io.Serializable; + /** * Http client options. */ diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/HttpConnectionProvider.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/HttpConnectionProvider.java index 2eba73bdf..76218d147 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/HttpConnectionProvider.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/HttpConnectionProvider.java @@ -2,5 +2,6 @@ public enum HttpConnectionProvider { HTTP_CLIENT, - HTTP_URL_CONNECTION + HTTP_URL_CONNECTION, + APACHE_HTTP_CLIENT } diff --git a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/ClickHouseHttpConnectionFactory.java b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/ClickHouseHttpConnectionFactory.java index 9bd6d54b0..5396f11c6 100644 --- a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/ClickHouseHttpConnectionFactory.java +++ b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/ClickHouseHttpConnectionFactory.java @@ -17,7 +17,9 @@ public static ClickHouseHttpConnection createConnection(ClickHouseNode server, C try { return provider == null || provider == HttpConnectionProvider.HTTP_URL_CONNECTION ? new HttpUrlConnectionImpl(server, request, executor) - : new HttpClientConnectionImpl(server, request, executor); + : provider == HttpConnectionProvider.HTTP_CLIENT + ? new HttpClientConnectionImpl(server, request, executor) + : new ApacheHttpConnectionImpl(server, request, executor); } catch (IOException e) { throw e; } catch (Throwable t) { diff --git a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java new file mode 100644 index 000000000..7ed1c24ca --- /dev/null +++ b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java @@ -0,0 +1,38 @@ +package com.clickhouse.client.http; + +import com.clickhouse.client.BaseIntegrationTest; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.client.http.config.ClickHouseHttpOption; +import com.clickhouse.client.http.config.HttpConnectionProvider; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ApacheHttpConnectionImplTest extends BaseIntegrationTest { + @Test(groups = { "integration" }) + public void testConnection() throws Exception { + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + try (ClickHouseClient client = ClickHouseClient.newInstance()) { + + ClickHouseRequest req1 = client.connect(server); + try (ClickHouseResponse resp = req1 + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, HttpConnectionProvider.APACHE_HTTP_CLIENT) + .query("select 1").executeAndWait()) { + Assert.assertEquals(resp.firstRecord().getValue(0).asString(), "1"); + } + + // req2 will use same connection with req1 + ClickHouseRequest req2 = client.connect(server); + try (ClickHouseResponse resp = req2 + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, HttpConnectionProvider.APACHE_HTTP_CLIENT) + .query("select 1").executeAndWait()) { + Assert.assertEquals(resp.firstRecord().getValue(0).asString(), "1"); + } + + } + } +} diff --git a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ByteArraysInputStreamTest.java b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ByteArraysInputStreamTest.java new file mode 100644 index 000000000..14a018694 --- /dev/null +++ b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ByteArraysInputStreamTest.java @@ -0,0 +1,177 @@ +package com.clickhouse.client.http; + +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.List; + +public class ByteArraysInputStreamTest { + + @Test(groups = { "unit" }) + public void testInvalidValue() { + Assert.assertThrows(NullPointerException.class, + () -> ByteArraysInputStream.of((List) null)); + Assert.assertThrows(NullPointerException.class, + () -> ByteArraysInputStream.of(new byte[] {1}, null)); + } + + @Test(groups = { "unit" }) + public void testNewInstance() { + ByteArraysInputStream in1 = ByteArraysInputStream.of(new byte[] {1,2,3}); + Assert.assertEquals(in1.pos, 0); + Assert.assertEquals(in1.count, 3); + Assert.assertEquals(in1.mark, 0); + Assert.assertEquals(in1.current, 0); + Assert.assertEquals(in1.currentPos, 0); + + ByteArraysInputStream in2 = ByteArraysInputStream.of(new byte[] {1,2,3}, new byte[] {4,5,6}); + Assert.assertEquals(in2.pos, 0); + Assert.assertEquals(in2.count, 6); + Assert.assertEquals(in2.mark, 0); + Assert.assertEquals(in2.current, 0); + Assert.assertEquals(in2.currentPos, 0); + } + + @DataProvider(name = "getIn") + protected Object[][] getIn() { + return new Object[][] { + { ByteArraysInputStream.of(new byte[] {}) }, + { ByteArraysInputStream.of(new byte[] {1,2,3}) }, + { ByteArraysInputStream.of(new byte[] {1,2,3}, new byte[] {4,5,6}) }, + { ByteArraysInputStream.of(new byte[] {1,2,3}, new byte[0], new byte[] {4,5,6}) }, + }; + } + + @Test(dataProvider = "getIn", groups = { "unit" }) + public void testRead(ByteArraysInputStream in) { + for (int i=0; i getClientClass() { return ClickHouseHttpClient.class; } - @Test(groups = "integration") - public void testAuthentication() throws ClickHouseException { + @DataProvider(name = "connectionProvider") + protected Object[][] getConnectionProvider() { + return new Object[][] { + { HttpConnectionProvider.HTTP_URL_CONNECTION }, + { HttpConnectionProvider.APACHE_HTTP_CLIENT } + }; + } + + @Test(dataProvider = "connectionProvider", groups = "integration") + public void testAuthentication(HttpConnectionProvider connProvider) throws ClickHouseException { String sql = "select currentUser()"; try (ClickHouseClient client = getClient( new ClickHouseConfig(null, ClickHouseCredentials.fromUserAndPassword("dba", "dba"), null, null)); ClickHouseResponse response = client .connect(getServer()) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) // .option(ClickHouseHttpOption.CUSTOM_PARAMS, "user=dba,password=incorrect") .query(sql).executeAndWait()) { Assert.assertEquals(response.firstRecord().getValue(0).asString(), "dba"); @@ -54,6 +68,7 @@ public void testAuthentication() throws ClickHouseException { try (ClickHouseClient client = getClient(); ClickHouseResponse response = client .connect(getServer()) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) .option(ClickHouseHttpOption.CUSTOM_HEADERS, "Authorization=Basic ZGJhOmRiYQ==") // .option(ClickHouseHttpOption.CUSTOM_PARAMS, "user=dba,password=incorrect") .query(sql).executeAndWait()) { @@ -64,21 +79,23 @@ public void testAuthentication() throws ClickHouseException { ClickHouseResponse response = client .connect(getServer(ClickHouseNode .of("http://localhost?custom_http_headers=aUthorization%3DBasic%20ZGJhOmRiYQ%3D%3D"))) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) .query(sql).executeAndWait()) { Assert.assertEquals(response.firstRecord().getValue(0).asString(), "dba"); } } - @Test(groups = "integration") - @Override - public void testSession() throws ClickHouseException { + @Test(dataProvider = "connectionProvider", groups = "integration") + public void testSession(HttpConnectionProvider connProvider) throws ClickHouseException { super.testSession(); ClickHouseNode server = getServer(); String sessionId = ClickHouseRequestManager.getInstance().createSessionId(); try (ClickHouseClient client = getClient()) { ClickHouseRequest req = client.connect(server).session(sessionId, true) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) .option(ClickHouseHttpOption.CUSTOM_PARAMS, "session_check=0,max_query_size=1000") + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, HttpConnectionProvider.APACHE_HTTP_CLIENT) .transaction(null) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes); try (ClickHouseResponse resp = req.copy() @@ -92,20 +109,24 @@ public void testSession() throws ClickHouseException { } } - @Test(groups = { "integration" }) - public void testPing() { - try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) { + @Test(dataProvider = "connectionProvider", groups = "integration") + public void testPing(HttpConnectionProvider connProvider) { + try (ClickHouseClient client = ClickHouseClient.builder() + .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider).build()) { Assert.assertTrue(client.ping(getServer(), 3000)); } try (ClickHouseClient client = ClickHouseClient.builder() .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) .option(ClickHouseHttpOption.WEB_CONTEXT, "a/b").build()) { Assert.assertTrue(client.ping(getServer(), 3000)); } try (ClickHouseClient client = ClickHouseClient.builder() .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) .option(ClickHouseHttpOption.WEB_CONTEXT, "a/b") .option(ClickHouseClientOption.HEALTH_CHECK_METHOD, ClickHouseHealthCheckMethod.PING).build()) { Assert.assertFalse(client.ping(getServer(), 3000)); @@ -113,6 +134,7 @@ public void testPing() { try (ClickHouseClient client = ClickHouseClient.builder() .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) .option(ClickHouseHttpOption.WEB_CONTEXT, "/") .option(ClickHouseClientOption.HEALTH_CHECK_METHOD, ClickHouseHealthCheckMethod.PING).build()) { Assert.assertTrue(client.ping(getServer(), 3000)); @@ -120,6 +142,7 @@ public void testPing() { try (ClickHouseClient client = ClickHouseClient.builder() .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) .option(ClickHouseClientOption.HEALTH_CHECK_METHOD, ClickHouseHealthCheckMethod.PING) .removeOption(ClickHouseHttpOption.WEB_CONTEXT).build()) { Assert.assertTrue(client.ping(getServer(), 3000)); @@ -138,8 +161,8 @@ public void testTransaction() throws ClickHouseException { testImplicitTransaction(); } - @Test // (groups = "integration") - public void testSslClientAuth() throws ClickHouseException { + @Test(dataProvider = "connectionProvider")// (groups = "integration") + public void testSslClientAuth(HttpConnectionProvider connProvider) throws ClickHouseException { // NPE on JDK 8: // java.lang.NullPointerException // at sun.security.provider.JavaKeyStore.convertToBytes(JavaKeyStore.java:822) @@ -156,17 +179,20 @@ public void testSslClientAuth() throws ClickHouseException { // com.clickhouse.client.config.ClickHouseDefaultSslContextProvider.getKeyStore(ClickHouseDefaultSslContextProvider.java:105) ClickHouseNode server = getSecureServer(ClickHouseProtocol.HTTP); try (ClickHouseClient client = getSecureClient(); - ClickHouseResponse response = client.connect(server).query("select 123").executeAndWait()) { + ClickHouseResponse response = client.connect(server) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) + .query("select 123").executeAndWait()) { Assert.assertEquals(response.firstRecord().getValue(0).asInteger(), 123); } } - @Test(groups = { "integration" }) - public void testCreateTableAsSelect() throws ClickHouseException { + @Test(dataProvider = "connectionProvider", groups = { "integration" }) + public void testCreateTableAsSelect(HttpConnectionProvider connProvider) throws ClickHouseException { ClickHouseNode server = getServer(); sendAndWait(server, "drop table if exists test_create_table_as_select"); try (ClickHouseClient client = getClient()) { - ClickHouseRequest request = client.connect(server); + ClickHouseRequest request = client.connect(server) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider); try (ClickHouseResponse resp = request.write() .external(ClickHouseExternalTable.builder().name("myExtTable").addColumn("s", "String") .addColumn("i", "Int32").content(ClickHouseInputStream.of("one,1\ntwo,2")) @@ -190,16 +216,17 @@ public void testCreateTableAsSelect() throws ClickHouseException { } } - @Test(groups = { "integration" }) - @Override - public void testMutation() throws ClickHouseException { + @Test(dataProvider = "connectionProvider", groups = { "integration" }) + public void testMutation(HttpConnectionProvider connProvider) throws ClickHouseException { super.testMutation(); ClickHouseNode server = getServer(); sendAndWait(server, "drop table if exists test_http_mutation", "create table test_http_mutation(a String, b Nullable(Int64))engine=Memory"); try (ClickHouseClient client = getClient(); - ClickHouseResponse response = client.connect(server).set("send_progress_in_http_headers", 1) + ClickHouseResponse response = client.connect(server) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) + .set("send_progress_in_http_headers", 1) .query("insert into test_http_mutation select toString(number), number from numbers(1)") .executeAndWait()) { ClickHouseResponseSummary summary = response.getSummary(); @@ -207,12 +234,14 @@ public void testMutation() throws ClickHouseException { } } - @Test(groups = { "integration" }) - public void testLogComment() throws ClickHouseException { + @Test(dataProvider = "connectionProvider", groups = { "integration" }) + public void testLogComment(HttpConnectionProvider connProvider) throws ClickHouseException, IOException { ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); String uuid = UUID.randomUUID().toString(); try (ClickHouseClient client = ClickHouseClient.newInstance()) { - ClickHouseRequest request = client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes); + ClickHouseRequest request = client.connect(server) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes); try (ClickHouseResponse resp = request .query("select version()").executeAndWait()) { if (!ClickHouseVersion.of(resp.firstRecord().getValue(0).asString()).check("[21.2,)")) { @@ -244,14 +273,15 @@ public void testLogComment() throws ClickHouseException { } } - @Test(groups = { "integration" }) - public void testPost() throws ClickHouseException { + @Test(dataProvider = "connectionProvider", groups = { "integration" }) + public void testPost(HttpConnectionProvider connProvider) throws ClickHouseException { ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); try (ClickHouseClient client = ClickHouseClient.builder() .defaultCredentials(ClickHouseCredentials.fromUserAndPassword("foo", "bar")).build()) { // why no detailed error message for this: "select 1,2" try (ClickHouseResponse resp = client.connect(server).compressServerResponse(false) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query("select 1,2").executeAndWait()) { int count = 0; @@ -265,7 +295,9 @@ public void testPost() throws ClickHouseException { } // reuse connection - try (ClickHouseResponse resp = client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + try (ClickHouseResponse resp = client.connect(server) + .option(ClickHouseHttpOption.CONNECTION_PROVIDER, connProvider) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query("select 3,4").executeAndWait()) { int count = 0; for (ClickHouseRecord r : resp.records()) { diff --git a/pom.xml b/pom.xml index 24c1da1e2..2ee80b6c9 100644 --- a/pom.xml +++ b/pom.xml @@ -99,6 +99,7 @@ 1.17.5 7.5 + 5.2 3.0.9 8.0.31 @@ -264,6 +265,11 @@ testng ${testng.version} + + org.apache.httpcomponents.client5 + httpclient5 + ${apache.httpclient.version} + org.mariadb.jdbc