diff --git a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java index 6c018ba31..68f461e7e 100644 --- a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java +++ b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; @@ -40,7 +41,7 @@ public Mono get(KafkaCluster cluster) { } private Mono createAdminClient(KafkaCluster cluster) { - return Mono.fromSupplier(() -> { + return Mono.fromFuture(CompletableFuture.supplyAsync(() -> { Properties properties = new Properties(); KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); properties.putAll(cluster.getProperties()); @@ -51,7 +52,7 @@ private Mono createAdminClient(KafkaCluster cluster) { "kafbat-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet() ); return AdminClient.create(properties); - }).flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close())) + })).flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close())) .onErrorMap(th -> new IllegalStateException( "Error while creating AdminClient for the cluster " + cluster.getName(), th)); }