From 0d8ba5416b376f09b7039516b0ef73da3947411b Mon Sep 17 00:00:00 2001 From: Dugong Date: Fri, 3 Jan 2025 11:11:43 +0000 Subject: [PATCH] adapt to connect retry mechanism use the proper return type for the client interface fix stopConnector override with new retry mechanism --- .../ui/client/RetryingKafkaConnectClient.java | 23 ++++++++++++++++--- .../kafbat/ui/KafkaConnectServiceTests.java | 12 +--------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java index 0f2185bf2..df2da3e55 100644 --- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java +++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java @@ -233,14 +233,19 @@ public Mono pauseConnector(String connectorName) throws WebClientResponseE return withRetryOnConflictOrRebalance(super.pauseConnector(connectorName)); } + @Override + public Mono> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException { + return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName)); + } + @Override public Mono stopConnector(String connectorName) throws WebClientResponseException { - return withRetryOnConflict(super.stopConnector(connectorName)); + return withRetryOnConflictOrRebalance(super.stopConnector(connectorName)); } @Override - public Mono> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException { - return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName)); + public Mono> stopConnectorWithHttpInfo(String connectorName) throws WebClientResponseException { + return withRetryOnConflictOrRebalance(super.stopConnectorWithHttpInfo(connectorName)); } @Override @@ -266,6 +271,18 @@ public Mono> restartConnectorTaskWithHttpInfo(String connec return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId)); } + @Override + public Mono resetConnectorOffsets(String connectorName) + throws WebClientResponseException { + return withRetryOnConflictOrRebalance(super.resetConnectorOffsets(connectorName)); + } + + @Override + public Mono> resetConnectorOffsetsWithHttpInfo(String connectorName) + throws WebClientResponseException { + return withRetryOnConflictOrRebalance(super.resetConnectorOffsetsWithHttpInfo(connectorName)); + } + @Override public Mono resumeConnector(String connectorName) throws WebClientResponseException { return withRetryOnRebalance(super.resumeConnector(connectorName)); diff --git a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java index 1415f8d69..b12d5e78b 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java @@ -59,20 +59,10 @@ public void setUp() { "file", "/tmp/test", "test.password", "test-credentials"))) .exchange() + .expectStatus().isOk() .expectBody() .returnResult(); - webTestClient.get() - .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", - LOCAL, connectName, connectorName) - .exchange() - .expectStatus().isOk(); - - // Kafka Connect may return transient HTTP 500 errors during rebalances - if (creationResult.getStatus() != HttpStatus.OK) { - log.warn( - "Ignoring a transient error while setting up the tested connector, because it has been created anyway."); - } } @AfterEach