diff --git a/client-v2/pom.xml b/client-v2/pom.xml index f7b70f6c5..d2e5e842d 100644 --- a/client-v2/pom.xml +++ b/client-v2/pom.xml @@ -74,6 +74,14 @@ jackson-core 2.17.2 + + io.micrometer + micrometer-core + 1.14.3 + true + compile + + @@ -191,6 +199,11 @@ true true true + + + io.micrometer:* + + org.slf4j diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index fcdcd4b85..79b7289dd 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -39,13 +39,10 @@ import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.data.ClickHouseColumn; import com.clickhouse.data.ClickHouseFormat; -import org.apache.hc.client5.http.ConnectTimeoutException; import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.http.ClassicHttpResponse; -import org.apache.hc.core5.http.ConnectionRequestTimeoutException; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpStatus; -import org.apache.hc.core5.http.NoHttpResponseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +51,6 @@ import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.ConnectException; -import java.net.SocketTimeoutException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -146,15 +141,22 @@ public class Client implements AutoCloseable { // Server context private String serverVersion; + private Object metrics; private Client(Set endpoints, Map configuration, boolean useNewImplementation, ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy) { + this(endpoints, configuration, useNewImplementation, sharedOperationExecutor, columnToMethodMatchingStrategy, null); + } + + private Client(Set endpoints, Map configuration, boolean useNewImplementation, + ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy, Object metrics) { this.endpoints = endpoints; this.configuration = configuration; this.readOnlyConfig = Collections.unmodifiableMap(this.configuration); this.endpoints.forEach(endpoint -> { this.serverNodes.add(ClickHouseNode.of(endpoint, this.configuration)); }); + this.metrics = metrics; this.serializers = new ConcurrentHashMap<>(); this.deserializers = new ConcurrentHashMap<>(); @@ -166,7 +168,7 @@ private Client(Set endpoints, Map configuration, boolean this.isSharedOpExecuterorOwned = false; this.sharedOperationExecutor = sharedOperationExecutor; } - this.httpClientHelper = new HttpAPIClientHelper(configuration); + this.httpClientHelper = new HttpAPIClientHelper(configuration, metrics); this.columnToMethodMatchingStrategy = columnToMethodMatchingStrategy; updateServerContext(); } @@ -233,7 +235,7 @@ public static class Builder { private ExecutorService sharedOperationExecutor = null; private ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy; - + private Object metric = null; public Builder() { this.endpoints = new HashSet<>(); this.configuration = new HashMap(); @@ -761,7 +763,7 @@ public Builder useAsyncRequests(boolean async) { * Executor will stay running after {@code Client#close() } is called. It is application responsibility to close * the executor. * @param executorService - executor service for async operations - * @return + * @return */ public Builder setSharedOperationExecutor(ExecutorService executorService) { this.sharedOperationExecutor = executorService; @@ -950,6 +952,19 @@ public Builder useBearerTokenAuth(String bearerToken) { return this; } + /** + * Registers http client metrics with MeterRegistry. + * + * @param registry - metrics registry + * @param name - name of metrics group + * @return same instance of the builder + */ + public Builder registerClientMetrics(Object registry, String name) { + this.metric = registry; + this.configuration.put(ClientConfigProperties.METRICS_GROUP_NAME.getKey(), name); + return this; + } + public Client build() { setDefaults(); @@ -1009,7 +1024,7 @@ public Client build() { } return new Client(this.endpoints, this.configuration, this.useNewImplementation, this.sharedOperationExecutor, - this.columnToMethodMatchingStrategy); + this.columnToMethodMatchingStrategy, this.metric); } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java b/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java index a35c3a4fc..a24f41443 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java @@ -127,7 +127,10 @@ public enum ClientConfigProperties { * Indicates that data provided for write operation is compressed by application. */ APP_COMPRESSED_DATA("app_compressed_data"), - + /** + * + */ + METRICS_GROUP_NAME("metrics_name"), ; private String key; diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index b1fe151a3..0d9cf4bea 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -21,6 +21,7 @@ import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; import org.apache.hc.client5.http.impl.io.ManagedHttpClientConnectionFactory; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.hc.client5.http.protocol.HttpClientContext; @@ -57,6 +58,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Method; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.NoRouteToHostException; @@ -94,9 +96,10 @@ public class HttpAPIClientHelper { private final Set defaultRetryCauses; private String defaultUserAgent; - - public HttpAPIClientHelper(Map configuration) { + private Object metricsRegistry; + public HttpAPIClientHelper(Map configuration, Object metricsRegistry) { this.chConfiguration = configuration; + this.metricsRegistry = metricsRegistry; this.httpClient = createHttpClient(); RequestConfig.Builder reqConfBuilder = RequestConfig.custom(); @@ -189,7 +192,6 @@ private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext, PoolingHttpClientConnectionManagerBuilder connMgrBuilder = PoolingHttpClientConnectionManagerBuilder.create() .setPoolConcurrencyPolicy(PoolConcurrencyPolicy.LAX); - ConnectionReuseStrategy connectionReuseStrategy = ConnectionReuseStrategy.valueOf(chConfiguration.get("connection_reuse_strategy")); switch (connectionReuseStrategy) { @@ -221,7 +223,19 @@ private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext, connMgrBuilder.setConnectionFactory(connectionFactory); connMgrBuilder.setSSLSocketFactory(new SSLConnectionSocketFactory(sslContext)); connMgrBuilder.setDefaultSocketConfig(socketConfig); - return connMgrBuilder.build(); + PoolingHttpClientConnectionManager phccm = connMgrBuilder.build(); + if (metricsRegistry != null ) { + try { + String mGroupName = chConfiguration.getOrDefault(ClientConfigProperties.METRICS_GROUP_NAME.getKey(), + "ch-http-pool"); + Class micrometerLoader = getClass().getClassLoader().loadClass("com.clickhouse.client.api.metrics.MicrometerLoader"); + Method applyMethod = micrometerLoader.getDeclaredMethod("applyPoolingMetricsBinder", Object.class, String.class, PoolingHttpClientConnectionManager.class); + applyMethod.invoke(micrometerLoader, metricsRegistry, mGroupName, phccm); + } catch (Exception e) { + LOG.error("Failed to register metrics", e); + } + } + return phccm; } public CloseableHttpClient createHttpClient() { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/metrics/MicrometerLoader.java b/client-v2/src/main/java/com/clickhouse/client/api/metrics/MicrometerLoader.java new file mode 100644 index 000000000..4856f1567 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/metrics/MicrometerLoader.java @@ -0,0 +1,17 @@ +package com.clickhouse.client.api.metrics; + +import com.clickhouse.client.api.ClientMisconfigurationException; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.httpcomponents.hc5.PoolingHttpClientConnectionManagerMetricsBinder; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; + +public class MicrometerLoader { + + public static void applyPoolingMetricsBinder(Object registry, String metricsGroupName, PoolingHttpClientConnectionManager phccm) { + if (registry instanceof MeterRegistry) { + new PoolingHttpClientConnectionManagerMetricsBinder(phccm, metricsGroupName).bindTo((MeterRegistry) registry); + } else { + throw new ClientMisconfigurationException("Unsupported registry type." + registry.getClass()); + } + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/metrics/MetricsTest.java b/client-v2/src/test/java/com/clickhouse/client/metrics/MetricsTest.java new file mode 100644 index 000000000..6f78af5c4 --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/metrics/MetricsTest.java @@ -0,0 +1,72 @@ +package com.clickhouse.client.metrics; + +import com.clickhouse.client.BaseIntegrationTest; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseServerForTest; +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientConfigProperties; +import com.clickhouse.client.api.enums.Protocol; +import com.clickhouse.client.api.internal.ServerSettings; +import com.clickhouse.client.api.query.QueryResponse; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class MetricsTest extends BaseIntegrationTest { + private MeterRegistry meterRegistry; + @BeforeMethod(groups = {"integration"}) + void setUp() { + meterRegistry = new SimpleMeterRegistry(); + Metrics.globalRegistry.add(meterRegistry); + } + + @AfterMethod(groups = {"integration"}) + void tearDown() { + meterRegistry.clear(); + Metrics.globalRegistry.clear(); + } + + @Test(groups = { "integration" }, enabled = true) + public void testRegisterMetrics() throws Exception { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + + try (Client client = new Client.Builder() + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .serverSetting(ServerSettings.ASYNC_INSERT, "0") + .serverSetting(ServerSettings.WAIT_END_OF_QUERY, "1") + .registerClientMetrics(meterRegistry, "pool-test") + .build()) { + + Gauge totalMax = meterRegistry.get("httpcomponents.httpclient.pool.total.max").gauge(); + Gauge available = meterRegistry.get("httpcomponents.httpclient.pool.total.connections").tags("state", "available").gauge(); + Gauge leased = meterRegistry.get("httpcomponents.httpclient.pool.total.connections").tags("state", "leased").gauge(); + + System.out.println("totalMax:" + totalMax.value() + ", available: " + available.value() + ", leased: " + leased.value()); + Assert.assertEquals((int)totalMax.value(), Integer.parseInt(ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getDefaultValue())); + Assert.assertEquals((int)available.value(), 1); + Assert.assertEquals((int)leased.value(), 0); + + try (QueryResponse response = client.query("SELECT 1").get()) { + Assert.assertEquals((int)available.value(), 0); + Assert.assertEquals((int)leased.value(), 1); + } + + Assert.assertEquals((int)available.value(), 1); + Assert.assertEquals((int)leased.value(), 0); + } + // currently there are only 5 metrics that are monitored by micrometer (out of the box) + assertEquals(meterRegistry.getMeters().size(), 5); + } +}