Skip to content

Commit

Permalink
Merge pull request #760 from ryan-tu/validate_stale_conn
Browse files Browse the repository at this point in the history
Validate stale connection to fix the bug: failed to respond
  • Loading branch information
zhicwu authored Dec 1, 2021
2 parents 8a4aee2 + a265d08 commit 26da7d8
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator {
/**
* for ConnectionManager
*/
VALIDATE_AFTER_INACTIVITY_MILLIS("validateAfterInactivityMillis", 3 * 1000, "period of inactivity in milliseconds after which persistent connections must be re-validated, this check helps detect connections that have become stale (half-closed) while kept inactive in the pool. "),
TIME_TO_LIVE_MILLIS("timeToLiveMillis", 60 * 1000, ""),
DEFAULT_MAX_PER_ROUTE("defaultMaxPerRoute", 500, ""),
MAX_TOTAL("maxTotal", 10000, ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ClickHouseProperties {
private int connectionTimeout;
private int dataTransferTimeout;
private int timeToLiveMillis;
private int validateAfterInactivityMillis;
private int defaultMaxPerRoute;
private int maxTotal;
private int maxRetries;
Expand Down Expand Up @@ -112,6 +113,7 @@ public ClickHouseProperties(Properties info) {
this.connectionTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.CONNECTION_TIMEOUT);
this.dataTransferTimeout = (Integer)getSetting(info, ClickHouseConnectionSettings.DATA_TRANSFER_TIMEOUT);
this.timeToLiveMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS);
this.validateAfterInactivityMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.VALIDATE_AFTER_INACTIVITY_MILLIS);
this.defaultMaxPerRoute = (Integer)getSetting(info, ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE);
this.maxTotal = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_TOTAL);
this.maxRetries = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_RETRIES);
Expand Down Expand Up @@ -182,6 +184,7 @@ public Properties asProperties() {
ret.put(ClickHouseConnectionSettings.CONNECTION_TIMEOUT.getKey(), String.valueOf(connectionTimeout));
ret.put(ClickHouseConnectionSettings.DATA_TRANSFER_TIMEOUT.getKey(), String.valueOf(dataTransferTimeout));
ret.put(ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS.getKey(), String.valueOf(timeToLiveMillis));
ret.put(ClickHouseConnectionSettings.VALIDATE_AFTER_INACTIVITY_MILLIS.getKey(), String.valueOf(validateAfterInactivityMillis));
ret.put(ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE.getKey(), String.valueOf(defaultMaxPerRoute));
ret.put(ClickHouseConnectionSettings.MAX_TOTAL.getKey(), String.valueOf(maxTotal));
ret.put(ClickHouseConnectionSettings.MAX_RETRIES.getKey(), String.valueOf(maxRetries));
Expand Down Expand Up @@ -254,6 +257,7 @@ public ClickHouseProperties(ClickHouseProperties properties) {
setConnectionTimeout(properties.connectionTimeout);
setDataTransferTimeout(properties.dataTransferTimeout);
setTimeToLiveMillis(properties.timeToLiveMillis);
setValidateAfterInactivityMillis(properties.validateAfterInactivityMillis);
setDefaultMaxPerRoute(properties.defaultMaxPerRoute);
setMaxTotal(properties.maxTotal);
setMaxRetries(properties.maxRetries);
Expand Down Expand Up @@ -582,6 +586,14 @@ public void setTimeToLiveMillis(int timeToLiveMillis) {
this.timeToLiveMillis = timeToLiveMillis;
}

public int getValidateAfterInactivityMillis() {
return validateAfterInactivityMillis;
}

public void setValidateAfterInactivityMillis(int validateAfterInactivityMillis) {
this.validateAfterInactivityMillis = validateAfterInactivityMillis;
}

public int getDefaultMaxPerRoute() {
return defaultMaxPerRoute;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ private PoolingHttpClientConnectionManager getConnectionManager()
TimeUnit.MILLISECONDS
);

connectionManager.setValidateAfterInactivity(properties.getValidateAfterInactivityMillis());
connectionManager.setDefaultMaxPerRoute(properties.getDefaultMaxPerRoute());
connectionManager.setMaxTotal(properties.getMaxTotal());
connectionManager.setDefaultConnectionConfig(getConnectionConfig());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package ru.yandex.clickhouse.util;

import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -176,18 +178,22 @@ private static Object[][] provideAuthUserPasswordTestData() {
};
}

private static WireMockServer newServer() {
private static WireMockServer newServer(int delayMillis) {
WireMockServer server = new WireMockServer(
WireMockConfiguration.wireMockConfig().dynamicPort());
server.start();
server.stubFor(WireMock.post(WireMock.urlPathMatching("/*"))
.willReturn(WireMock.aResponse().withStatus(200).withHeader("Connection", "Keep-Alive")
.withHeader("Content-Type", "text/plain; charset=UTF-8")
.withHeader("Transfer-Encoding", "chunked").withHeader("Keep-Alive", "timeout=3")
.withBody("OK.........................").withFixedDelay(2)));
.withBody("OK.........................").withFixedDelay(delayMillis)));
return server;
}

private static WireMockServer newServer() {
return newServer(2);
}

private static void shutDownServerWithDelay(final WireMockServer server, final long delayMs) {
new Thread() {
public void run() {
Expand All @@ -203,38 +209,104 @@ public void run() {
}.start();
}

// @Test(groups = "unit", dependsOnMethods = { "testWithRetry" }, expectedExceptions = { NoHttpResponseException.class })
public void testWithoutRetry() throws Exception {
final WireMockServer server = newServer();
@Test(expectedExceptions = { NoHttpResponseException.class })
public void testReproduceFailedToResponseProblem() throws Exception {
final WireMockServer server = newServer(2);

ClickHouseProperties props = new ClickHouseProperties();
// Disable retry when "failed to respond" occurs.
props.setMaxRetries(0);
// Disable validation to reproduce "failed to respond" problem
props.setValidateAfterInactivityMillis(0);
// Ensure there is exactly one TCP connection in connection pool and therefore be re-used between
// multiple http requests.
props.setMaxTotal(1);
props.setDefaultMaxPerRoute(1);

ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props);
CloseableHttpClient client = builder.buildClient();
HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%201");

shutDownServerWithDelay(server, 500);
try {
// Make the 1st http request to establish one tcp connection and keep it in the pool.
{
HttpResponse response = client.execute(post);
EntityUtils.consume(response.getEntity());
}

// Close the server, now the pooling tcp connection is half closed.
server.shutdownServer();
server.stop();

// The 2nd http request will re-use the pooling tcp connection which is stale
// and "failed to respond" occurs.
{
HttpResponse response = client.execute(post);
EntityUtils.consume(response.getEntity());
}
} finally {
client.close();
}
}

@Test(expectedExceptions = { HttpHostConnectException.class })
public void testEnableValidation() throws Exception {
final WireMockServer server = newServer(2);

ClickHouseProperties props = new ClickHouseProperties();
// Disable retry when "failed to respond" occurs.
props.setMaxRetries(0);
// Disable validation to reproduce "failed to respond" problem
props.setValidateAfterInactivityMillis(1);
// Ensure there is exactly one TCP connection in connection pool and therefore be re-used between
// multiple http requests.
props.setMaxTotal(1);
props.setDefaultMaxPerRoute(1);

ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props);
CloseableHttpClient client = builder.buildClient();
HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%201");

try {
client.execute(post);
// Make the 1st http request to establish one tcp connection and keep it in the pool.
{
HttpResponse response = client.execute(post);
EntityUtils.consume(response.getEntity());
}

// Sleep a while to wait for the validation reaches inactivity timeout.
Thread.sleep(5);

// Close the server, now the pooling tcp connection is half closed.
server.shutdownServer();
server.stop();

// The 2nd http request re-uses the pooling tcp connection.
// But the validation checks that the connection has been stale, thus a
// new tcp connection is attempted to establish to the closed server
// which leads to HttpHostConnectException.
{
HttpResponse response = client.execute(post);
EntityUtils.consume(response.getEntity());
}
} finally {
client.close();
}
}

// @Test(groups = "unit", expectedExceptions = { HttpHostConnectException.class })
@Test(expectedExceptions = { HttpHostConnectException.class })
public void testWithRetry() throws Exception {
final WireMockServer server = newServer();
final WireMockServer server = newServer(500);

ClickHouseProperties props = new ClickHouseProperties();
// props.setMaxRetries(3);
props.setMaxRetries(3);
ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props);
CloseableHttpClient client = builder.buildClient();
HttpContext context = new BasicHttpContext();
context.setAttribute("is_idempotent", Boolean.TRUE);
HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%202");
shutDownServerWithDelay(server, 500);

shutDownServerWithDelay(server, 100);

try {
client.execute(post, context);
Expand Down

0 comments on commit 26da7d8

Please sign in to comment.