From 8ae5ed921fef6c3a3a82faaddc98c4ecb586e78d Mon Sep 17 00:00:00 2001 From: Thomas Newman Date: Wed, 15 Jan 2025 12:16:46 -0500 Subject: [PATCH 1/3] Create Kafka Admin Client outside of the Parallel scheduler thread pool --- .../java/io/kafbat/ui/service/AdminClientServiceImpl.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java index 6c018ba31..8472b73bf 100644 --- a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java +++ b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; @@ -15,6 +16,8 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; @Service @Slf4j @@ -40,7 +43,7 @@ public Mono get(KafkaCluster cluster) { } private Mono createAdminClient(KafkaCluster cluster) { - return Mono.fromSupplier(() -> { + return Mono.fromFuture(CompletableFuture.supplyAsync(() -> { Properties properties = new Properties(); KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); properties.putAll(cluster.getProperties()); @@ -51,7 +54,8 @@ private Mono createAdminClient(KafkaCluster cluster) { "kafbat-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet() ); return AdminClient.create(properties); - }).flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close())) + })) + .flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close())) .onErrorMap(th -> new IllegalStateException( "Error while creating AdminClient for the cluster " + cluster.getName(), th)); } From 36ad951efbb701cc0d3d66ce4d8708929cbb1195 Mon Sep 17 00:00:00 2001 From: Thomas Newman Date: Wed, 15 Jan 2025 12:19:38 -0500 Subject: [PATCH 2/3] Removed unused imports --- .../main/java/io/kafbat/ui/service/AdminClientServiceImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java index 8472b73bf..881adbd8c 100644 --- a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java +++ b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java @@ -16,8 +16,6 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; @Service @Slf4j From da0b26dfdc41dc8ba7e6b9083012c8205a611955 Mon Sep 17 00:00:00 2001 From: Thomas Newman Date: Wed, 15 Jan 2025 12:20:21 -0500 Subject: [PATCH 3/3] Fixed indentation --- .../main/java/io/kafbat/ui/service/AdminClientServiceImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java index 881adbd8c..68f461e7e 100644 --- a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java +++ b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java @@ -52,8 +52,7 @@ private Mono createAdminClient(KafkaCluster cluster) { "kafbat-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet() ); return AdminClient.create(properties); - })) - .flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close())) + })).flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close())) .onErrorMap(th -> new IllegalStateException( "Error while creating AdminClient for the cluster " + cluster.getName(), th)); }