Skip to content

Commit

Permalink
adapt to connect retry mechanism
Browse files Browse the repository at this point in the history
use the proper return type for the client interface

fix stopConnector override with new retry mechanism
  • Loading branch information
Dugong42 committed Jan 3, 2025
1 parent cb06d48 commit 0d8ba54
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,19 @@ public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseE
return withRetryOnConflictOrRebalance(super.pauseConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Void> stopConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.stopConnector(connectorName));
return withRetryOnConflictOrRebalance(super.stopConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
public Mono<ResponseEntity<Void>> stopConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.stopConnectorWithHttpInfo(connectorName));
}

@Override
Expand All @@ -266,6 +271,18 @@ public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connec
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
}

@Override
public Mono<Void> resetConnectorOffsets(String connectorName)
throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.resetConnectorOffsets(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> resetConnectorOffsetsWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.resetConnectorOffsetsWithHttpInfo(connectorName));
}

@Override
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
return withRetryOnRebalance(super.resumeConnector(connectorName));
Expand Down
12 changes: 1 addition & 11 deletions api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,10 @@ public void setUp() {
"file", "/tmp/test",
"test.password", "test-credentials")))
.exchange()
.expectStatus().isOk()
.expectBody()
.returnResult();

webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk();

// Kafka Connect may return transient HTTP 500 errors during rebalances
if (creationResult.getStatus() != HttpStatus.OK) {
log.warn(
"Ignoring a transient error while setting up the tested connector, because it has been created anyway.");
}
}

@AfterEach
Expand Down

0 comments on commit 0d8ba54

Please sign in to comment.