diff --git a/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java b/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java index 8b8ef6bde..1ecdbd1fb 100644 --- a/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java +++ b/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/settings/ClickHouseConnectionSettings.java @@ -31,6 +31,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator { /** * for ConnectionManager */ + VALIDATE_AFTER_INACTIVITY_MILLIS("validateAfterInactivityMillis", 3 * 1000, "period of inactivity in milliseconds after which persistent connections must be re-validated, this check helps detect connections that have become stale (half-closed) while kept inactive in the pool. "), TIME_TO_LIVE_MILLIS("timeToLiveMillis", 60 * 1000, ""), DEFAULT_MAX_PER_ROUTE("defaultMaxPerRoute", 500, ""), MAX_TOTAL("maxTotal", 10000, ""), diff --git a/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java b/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java index 2b144ce03..bfdde7672 100644 --- a/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java +++ b/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/settings/ClickHouseProperties.java @@ -17,6 +17,7 @@ public class ClickHouseProperties { private int connectionTimeout; private int dataTransferTimeout; private int timeToLiveMillis; + private int validateAfterInactivityMillis; private int defaultMaxPerRoute; private int maxTotal; private int maxRetries; @@ -112,6 +113,7 @@ public ClickHouseProperties(Properties info) { this.connectionTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.CONNECTION_TIMEOUT); this.dataTransferTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.DATA_TRANSFER_TIMEOUT); this.timeToLiveMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS); + this.validateAfterInactivityMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.VALIDATE_AFTER_INACTIVITY_MILLIS); this.defaultMaxPerRoute = (Integer)getSetting(info, ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE); this.maxTotal = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_TOTAL); this.maxRetries = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_RETRIES); @@ -182,6 +184,7 @@ public Properties asProperties() { ret.put(ClickHouseConnectionSettings.CONNECTION_TIMEOUT.getKey(), String.valueOf(connectionTimeout)); ret.put(ClickHouseConnectionSettings.DATA_TRANSFER_TIMEOUT.getKey(), String.valueOf(dataTransferTimeout)); ret.put(ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS.getKey(), String.valueOf(timeToLiveMillis)); + ret.put(ClickHouseConnectionSettings.VALIDATE_AFTER_INACTIVITY_MILLIS.getKey(), String.valueOf(validateAfterInactivityMillis)); ret.put(ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE.getKey(), String.valueOf(defaultMaxPerRoute)); ret.put(ClickHouseConnectionSettings.MAX_TOTAL.getKey(), String.valueOf(maxTotal)); ret.put(ClickHouseConnectionSettings.MAX_RETRIES.getKey(), String.valueOf(maxRetries)); @@ -254,6 +257,7 @@ public ClickHouseProperties(ClickHouseProperties properties) { setConnectionTimeout(properties.connectionTimeout); setDataTransferTimeout(properties.dataTransferTimeout); setTimeToLiveMillis(properties.timeToLiveMillis); + setValidateAfterInactivityMillis(properties.validateAfterInactivityMillis); setDefaultMaxPerRoute(properties.defaultMaxPerRoute); setMaxTotal(properties.maxTotal); setMaxRetries(properties.maxRetries); @@ -582,6 +586,14 @@ public void setTimeToLiveMillis(int timeToLiveMillis) { this.timeToLiveMillis = timeToLiveMillis; } + public int getValidateAfterInactivityMillis() { + return validateAfterInactivityMillis; + } + + public void setValidateAfterInactivityMillis(int validateAfterInactivityMillis) { + this.validateAfterInactivityMillis = validateAfterInactivityMillis; + } + public int getDefaultMaxPerRoute() { return defaultMaxPerRoute; } diff --git a/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java b/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java index a7fd7956d..782d07896 100644 --- a/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java +++ b/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilder.java @@ -151,6 +151,7 @@ private PoolingHttpClientConnectionManager getConnectionManager() TimeUnit.MILLISECONDS ); + connectionManager.setValidateAfterInactivity(properties.getValidateAfterInactivityMillis()); connectionManager.setDefaultMaxPerRoute(properties.getDefaultMaxPerRoute()); connectionManager.setMaxTotal(properties.getMaxTotal()); connectionManager.setDefaultConnectionConfig(getConnectionConfig()); diff --git a/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilderTest.java b/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilderTest.java index a04bfada5..a88db92a3 100644 --- a/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilderTest.java +++ b/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/util/ClickHouseHttpClientBuilderTest.java @@ -1,6 +1,7 @@ package ru.yandex.clickhouse.util; import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; import org.apache.http.NoHttpResponseException; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; @@ -8,6 +9,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -176,7 +178,7 @@ private static Object[][] provideAuthUserPasswordTestData() { }; } - private static WireMockServer newServer() { + private static WireMockServer newServer(int delayMillis) { WireMockServer server = new WireMockServer( WireMockConfiguration.wireMockConfig().dynamicPort()); server.start(); @@ -184,10 +186,14 @@ private static WireMockServer newServer() { .willReturn(WireMock.aResponse().withStatus(200).withHeader("Connection", "Keep-Alive") .withHeader("Content-Type", "text/plain; charset=UTF-8") .withHeader("Transfer-Encoding", "chunked").withHeader("Keep-Alive", "timeout=3") - .withBody("OK.........................").withFixedDelay(2))); + .withBody("OK.........................").withFixedDelay(delayMillis))); return server; } + private static WireMockServer newServer() { + return newServer(2); + } + private static void shutDownServerWithDelay(final WireMockServer server, final long delayMs) { new Thread() { public void run() { @@ -203,38 +209,104 @@ public void run() { }.start(); } - // @Test(groups = "unit", dependsOnMethods = { "testWithRetry" }, expectedExceptions = { NoHttpResponseException.class }) - public void testWithoutRetry() throws Exception { - final WireMockServer server = newServer(); + @Test(expectedExceptions = { NoHttpResponseException.class }) + public void testReproduceFailedToResponseProblem() throws Exception { + final WireMockServer server = newServer(2); ClickHouseProperties props = new ClickHouseProperties(); + // Disable retry when "failed to respond" occurs. props.setMaxRetries(0); + // Disable validation to reproduce "failed to respond" problem + props.setValidateAfterInactivityMillis(0); + // Ensure there is exactly one TCP connection in connection pool and therefore be re-used between + // multiple http requests. + props.setMaxTotal(1); + props.setDefaultMaxPerRoute(1); + ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props); CloseableHttpClient client = builder.buildClient(); HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%201"); - shutDownServerWithDelay(server, 500); + try { + // Make the 1st http request to establish one tcp connection and keep it in the pool. + { + HttpResponse response = client.execute(post); + EntityUtils.consume(response.getEntity()); + } + + // Close the server, now the pooling tcp connection is half closed. + server.shutdownServer(); + server.stop(); + + // The 2nd http request will re-use the pooling tcp connection which is stale + // and "failed to respond" occurs. + { + HttpResponse response = client.execute(post); + EntityUtils.consume(response.getEntity()); + } + } finally { + client.close(); + } + } + + @Test(expectedExceptions = { HttpHostConnectException.class }) + public void testEnableValidation() throws Exception { + final WireMockServer server = newServer(2); + + ClickHouseProperties props = new ClickHouseProperties(); + // Disable retry when "failed to respond" occurs. + props.setMaxRetries(0); + // Disable validation to reproduce "failed to respond" problem + props.setValidateAfterInactivityMillis(1); + // Ensure there is exactly one TCP connection in connection pool and therefore be re-used between + // multiple http requests. + props.setMaxTotal(1); + props.setDefaultMaxPerRoute(1); + + ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props); + CloseableHttpClient client = builder.buildClient(); + HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%201"); try { - client.execute(post); + // Make the 1st http request to establish one tcp connection and keep it in the pool. + { + HttpResponse response = client.execute(post); + EntityUtils.consume(response.getEntity()); + } + + // Sleep a while to wait for the validation reaches inactivity timeout. + Thread.sleep(5); + + // Close the server, now the pooling tcp connection is half closed. + server.shutdownServer(); + server.stop(); + + // The 2nd http request re-uses the pooling tcp connection. + // But the validation checks that the connection has been stale, thus a + // new tcp connection is attempted to establish to the closed server + // which leads to HttpHostConnectException. + { + HttpResponse response = client.execute(post); + EntityUtils.consume(response.getEntity()); + } } finally { client.close(); } } - // @Test(groups = "unit", expectedExceptions = { HttpHostConnectException.class }) + @Test(expectedExceptions = { HttpHostConnectException.class }) public void testWithRetry() throws Exception { - final WireMockServer server = newServer(); + final WireMockServer server = newServer(500); ClickHouseProperties props = new ClickHouseProperties(); - // props.setMaxRetries(3); + props.setMaxRetries(3); ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props); CloseableHttpClient client = builder.buildClient(); HttpContext context = new BasicHttpContext(); context.setAttribute("is_idempotent", Boolean.TRUE); HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%202"); - - shutDownServerWithDelay(server, 500); + + shutDownServerWithDelay(server, 100); try { client.execute(post, context);