forked from kafbat/kafka-ui
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathAdminClientServiceImpl.java
64 lines (55 loc) · 2.57 KB
/
AdminClientServiceImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package io.kafbat.ui.service;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
import java.io.Closeable;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@Slf4j
public class AdminClientServiceImpl implements AdminClientService, Closeable {
private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000;
private static final AtomicLong CLIENT_ID_SEQ = new AtomicLong();
private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
private final int clientTimeout;
public AdminClientServiceImpl(ClustersProperties clustersProperties) {
this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout())
.orElse(DEFAULT_CLIENT_TIMEOUT_MS);
}
@Override
public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
.switchIfEmpty(createAdminClient(cluster))
.map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e));
}
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
Properties properties = new Properties();
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
properties.putIfAbsent(
AdminClientConfig.CLIENT_ID_CONFIG,
"kafbat-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet()
);
return AdminClient.create(properties);
})).flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
.onErrorMap(th -> new IllegalStateException(
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
}
@Override
public void close() {
adminClientCache.values().forEach(ReactiveAdminClient::close);
}
}