Skip to content

[client-v2] Initial metrics implementation in client v2 using micrometer #2093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions client-v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<artifactId>jackson-core</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.14.3</version>
</dependency>


<!-- Test Dependencies -->
<dependency>
Expand Down Expand Up @@ -135,6 +141,13 @@
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/io.micrometer/micrometer-observation -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-observation</artifactId>
<version>1.14.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
25 changes: 20 additions & 5 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseFormat;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http.ClassicHttpResponse;
Expand Down Expand Up @@ -158,15 +159,22 @@ public class Client implements AutoCloseable {

// Server context
private String serverVersion;
private Object metrics;

private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation,
ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy) {
this(endpoints, configuration, useNewImplementation, sharedOperationExecutor, columnToMethodMatchingStrategy, null);
}

private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation,
ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy, Object metrics) {
this.endpoints = endpoints;
this.configuration = configuration;
this.readOnlyConfig = Collections.unmodifiableMap(this.configuration);
this.endpoints.forEach(endpoint -> {
this.serverNodes.add(ClickHouseNode.of(endpoint, this.configuration));
});
this.metrics = metrics;
this.serializers = new ConcurrentHashMap<>();
this.deserializers = new ConcurrentHashMap<>();

Expand All @@ -178,15 +186,13 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
}
this.useNewImplementation = useNewImplementation;
if (useNewImplementation) {
this.httpClientHelper = new HttpAPIClientHelper(configuration);
this.httpClientHelper = new HttpAPIClientHelper(configuration, metrics);
LOG.info("Using new http client implementation");
} else {
this.oldClient = ClientV1AdaptorHelper.createClient(configuration);
LOG.info("Using old http client implementation");
}
this.columnToMethodMatchingStrategy = columnToMethodMatchingStrategy;


updateServerContext();
}

Expand Down Expand Up @@ -252,7 +258,7 @@ public static class Builder {

private ExecutorService sharedOperationExecutor = null;
private ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy;

private Object metric = null;
public Builder() {
this.endpoints = new HashSet<>();
this.configuration = new HashMap<String, String>();
Expand Down Expand Up @@ -967,6 +973,15 @@ public Builder useBearerTokenAuth(String bearerToken) {
this.httpHeader(HttpHeaders.AUTHORIZATION, "Bearer " + bearerToken);
return this;
}
public Builder addMetric(Object metric, String name) {
if (metric instanceof MeterRegistry) {
this.metric = metric;
this.configuration.put(ClientConfigProperties.METRICS_NAME.getKey(), name);
} else {
throw new IllegalArgumentException("Unsupported metric type. Only MeterRegistry is supported");
}
return this;
}

public Client build() {
setDefaults();
Expand Down Expand Up @@ -1027,7 +1042,7 @@ public Client build() {
}

return new Client(this.endpoints, this.configuration, this.useNewImplementation, this.sharedOperationExecutor,
this.columnToMethodMatchingStrategy);
this.columnToMethodMatchingStrategy, this.metric);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ public enum ClientConfigProperties {
* Indicates that data provided for write operation is compressed by application.
*/
APP_COMPRESSED_DATA("app_compressed_data"),

/**
*
*/
METRICS_NAME("metrics_name"),
;

private String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.clickhouse.client.api.data_formats.internal.SerializerUtils;
import com.clickhouse.client.api.enums.ProxyType;
import com.clickhouse.client.api.http.ClickHouseHttpProto;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.PoolingHttpClientConnectionManagerMetricsBinder;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.ConnectionConfig;
Expand All @@ -21,6 +23,7 @@
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.ManagedHttpClientConnectionFactory;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.client5.http.protocol.HttpClientContext;
Expand Down Expand Up @@ -92,9 +95,10 @@ public class HttpAPIClientHelper {
private final Set<ClientFaultCause> defaultRetryCauses;

private String defaultUserAgent;

public HttpAPIClientHelper(Map<String, String> configuration) {
private Object metric;
public HttpAPIClientHelper(Map<String, String> configuration, Object metric) {
this.chConfiguration = configuration;
this.metric = metric;
this.httpClient = createHttpClient();

RequestConfig.Builder reqConfBuilder = RequestConfig.custom();
Expand Down Expand Up @@ -187,7 +191,6 @@ private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext,
PoolingHttpClientConnectionManagerBuilder connMgrBuilder = PoolingHttpClientConnectionManagerBuilder.create()
.setPoolConcurrencyPolicy(PoolConcurrencyPolicy.LAX);


ConnectionReuseStrategy connectionReuseStrategy =
ConnectionReuseStrategy.valueOf(chConfiguration.get("connection_reuse_strategy"));
switch (connectionReuseStrategy) {
Expand Down Expand Up @@ -219,7 +222,12 @@ private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext,
connMgrBuilder.setConnectionFactory(connectionFactory);
connMgrBuilder.setSSLSocketFactory(new SSLConnectionSocketFactory(sslContext));
connMgrBuilder.setDefaultSocketConfig(socketConfig);
return connMgrBuilder.build();
PoolingHttpClientConnectionManager phccm = connMgrBuilder.build();
if (metric != null && metric instanceof MeterRegistry) {
String name = chConfiguration.getOrDefault(ClientConfigProperties.METRICS_NAME.getKey(), "http-pool");
new PoolingHttpClientConnectionManagerMetricsBinder(phccm, name).bindTo((MeterRegistry) metric);
}
return phccm;
}

public CloseableHttpClient createHttpClient() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.clickhouse.client.metrics;

import com.clickhouse.client.BaseIntegrationTest;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseServerForTest;
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.internal.ServerSettings;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;

public class MetricsTest extends BaseIntegrationTest {
private MeterRegistry meterRegistry;
@BeforeMethod
void setUp() {
meterRegistry = new SimpleMeterRegistry();
Metrics.globalRegistry.add(meterRegistry);
}

@AfterMethod
void tearDown() {
meterRegistry.clear();
Metrics.globalRegistry.clear();
}

@Test(groups = { "integration" }, enabled = true)
public void testRegisterMetrics() {
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
boolean isSecure = isCloud();
Client client = new Client.Builder()
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure)
.setUsername("default")
.setPassword(ClickHouseServerForTest.getPassword())
.setDefaultDatabase(ClickHouseServerForTest.getDatabase())
.serverSetting(ServerSettings.ASYNC_INSERT, "0")
.serverSetting(ServerSettings.WAIT_END_OF_QUERY, "1")
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
.addMetric(meterRegistry, "pool-test")
.build();
// currently there are only 5 metrics that are monitored by micrometer (out of the box)
assertEquals(meterRegistry.getMeters().size(), 5);
}
}