Skip to content

Commit 5d57bec

Browse files
authored
Fix supply AdminClient creation (#875)
1 parent 1682872 commit 5d57bec

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.kafka.clients.admin.AdminClientConfig;
1717
import org.springframework.stereotype.Service;
1818
import reactor.core.publisher.Mono;
19+
import reactor.core.scheduler.Schedulers;
1920

2021
@Service
2122
@Slf4j
@@ -41,7 +42,7 @@ public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
4142
}
4243

4344
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
44-
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
45+
return Mono.fromSupplier(() -> {
4546
Properties properties = new Properties();
4647
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
4748
properties.putAll(cluster.getProperties());
@@ -52,7 +53,8 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
5253
"kafbat-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet()
5354
);
5455
return AdminClient.create(properties);
55-
})).flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
56+
}).subscribeOn(Schedulers.boundedElastic())
57+
.flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
5658
.onErrorMap(th -> new IllegalStateException(
5759
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
5860
}

0 commit comments

Comments
 (0)