Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KC: Stop Connectors and Reset Connector Offsets #573

Merged
merged 41 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
fd5fedf
allow development behind proxy
Sep 26, 2024
70517a0
Add dropdown options to pause and stop connectors
Sep 30, 2024
6ce7078
format kafkbat-ui openapi definition
Sep 30, 2024
503a2a1
Add a dropdown option to reset the offsets of stopped connectors
Sep 30, 2024
b819013
Disable the "Remove Connector" button if the action is not allowed fo…
Oct 1, 2024
26e075d
Disable reset offsets button when connector is not stopped
Oct 2, 2024
d9d269b
Add frontend tests of connector reset
Oct 4, 2024
971c2c9
Add frontend tests for the reset connector offsets button
Oct 4, 2024
2c8f054
Merge branch 'main' into feature/reset-connector-offsets
Dugong42 Oct 4, 2024
c9d8aad
Fix code formatting and typos
Oct 4, 2024
a36bc20
Fix the confirmation message
Oct 4, 2024
b21d04e
fix missing resetConnectorOffsets mock in frontend unit tests
Oct 4, 2024
0c3495d
Revert "format kafkbat-ui openapi definition"
Oct 7, 2024
193daa0
Revert formatting changes in "Add a dropdown option to reset the offs…
Oct 7, 2024
05df53e
Revert formatting changes in "Fix code formatting and typos"
Oct 7, 2024
7b289c3
revert formatting changes to the controller
Oct 7, 2024
0d695cd
fix frontend tests
Oct 7, 2024
76ae0f1
fix prettier linter checks
Oct 7, 2024
d95b56f
Improve refresh behaviour on connector actions
Oct 7, 2024
f7345da
Merge branch 'main' into feature/reset-connector-offsets
Dugong42 Oct 8, 2024
cffa239
refacto using static imports for resetConnectorOffsets actions
Oct 29, 2024
951b8c3
Merge remote-tracking branch 'origin/main' into feature/reset-connect…
Oct 30, 2024
e8e4347
tests: reset stopped connector
Nov 5, 2024
67599c1
kafka connect API get offsets
Nov 5, 2024
2aebcf6
FIX: Type casting errors
Dugong42 Nov 5, 2024
e52d6cc
Merge branch 'fix/type-casting-errors' into feature/reset-connector-o…
Dugong42 Nov 5, 2024
f7f4591
Test bad request when resetting running connector
Dugong42 Nov 5, 2024
8c64835
Throw controlled BadRequest and NotFound errors for resetConnectorOff…
Dugong42 Nov 6, 2024
cf7cbdd
Use Confluent 7.7 and fix tests for resetConnectorOffsets
Dugong42 Nov 6, 2024
8f5915c
typo in test
Dugong42 Nov 6, 2024
62598e4
retry on kafka connect test setup failure
Dugong42 Nov 6, 2024
793c0a5
Handle error 500 by repeating the kafka connect test setUp
Dugong42 Nov 7, 2024
4a499e4
Optimistic creation of test connectors. Check existence and ignore cr…
Dugong42 Nov 8, 2024
035f2b3
cleanup
Nov 8, 2024
2b23258
Merge branch 'main' into feature/reset-connector-offsets
Dugong42 Nov 18, 2024
cb06d48
Merge commit 'a8811d1be3a0db2b02afa744c404ac961572b0d1' into feature/…
Dugong42 Jan 3, 2025
0d8ba54
adapt to connect retry mechanism
Dugong42 Jan 3, 2025
e802bf1
rollback test workaround leftovers
Dugong42 Jan 3, 2025
0e1bdb9
rollback forced typing leftovers
Dugong42 Jan 3, 2025
fbb6ddd
fix indent
Jan 3, 2025
513e3f9
Merge branch 'main' into feature/reset-connector-offsets
Haarolean Jan 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .mvn/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-Djava.net.useSystemProxies=true
1 change: 1 addition & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@
</goals>
<configuration>
<arguments>build</arguments>
<pnpmInheritsProxyConfigFromMaven>false</pnpmInheritsProxyConfigFromMaven>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseE
return withRetryOnConflict(super.pauseConnector(connectorName));
}

@Override
public Mono<Void> stopConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.stopConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.pauseConnectorWithHttpInfo(connectorName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@
@RequiredArgsConstructor
@Slf4j
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
private static final Set<ConnectorActionDTO> RESTART_ACTIONS = Set.of(RESTART, RESTART_FAILED_TASKS,
RESTART_ALL_TASKS);
private static final String CONNECTOR_NAME = "connectorName";

private final KafkaConnectService kafkaConnectService;

@Override
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
ServerWebExchange exchange) {
ServerWebExchange exchange) {

Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
.filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
Expand All @@ -52,7 +52,7 @@ public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,

@Override
public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
ServerWebExchange exchange) {
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
Expand All @@ -61,14 +61,15 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri
.build();

return validateAccess(context)
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
.thenReturn(
ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, String connectName,
@Valid Mono<NewConnectorDTO> connector,
ServerWebExchange exchange) {
@Valid Mono<NewConnectorDTO> connector,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
Expand All @@ -78,14 +79,14 @@ public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, St

return validateAccess(context).then(
kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, String connectName,
String connectorName,
ServerWebExchange exchange) {
String connectorName,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
Expand All @@ -95,14 +96,14 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin

return validateAccess(context).then(
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
String connectorName,
ServerWebExchange exchange) {
String connectorName,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
Expand All @@ -113,19 +114,17 @@ public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String con

return validateAccess(context).then(
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}


@Override
public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
String clusterName,
String search,
ConnectorColumnsToSortDTO orderBy,
SortOrderDTO sortOrder,
ServerWebExchange exchange
) {
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("getAllConnectors")
Expand All @@ -145,9 +144,9 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(

@Override
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
String connectName,
String connectorName,
ServerWebExchange exchange) {
String connectName,
String connectorName,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
Expand All @@ -158,15 +157,15 @@ public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clust
return validateAccess(context).then(
kafkaConnectService
.getConnectorConfig(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName, String connectName,
String connectorName,
Mono<Map<String, Object>> requestBody,
ServerWebExchange exchange) {
String connectorName,
Mono<Map<String, Object>> requestBody,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
Expand All @@ -176,22 +175,22 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
.build();

return validateAccess(context).then(
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok))
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, String connectName,
String connectorName,
ConnectorActionDTO action,
ServerWebExchange exchange) {
String connectorName,
ConnectorActionDTO action,
ServerWebExchange exchange) {
ConnectAction[] connectActions;
if (RESTART_ACTIONS.contains(action)) {
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.RESTART};
connectActions = new ConnectAction[] { ConnectAction.VIEW, ConnectAction.RESTART };
} else {
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.EDIT};
connectActions = new ConnectAction[] { ConnectAction.VIEW, ConnectAction.EDIT };
}

var context = AccessContext.builder()
Expand All @@ -204,15 +203,15 @@ public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, Strin
return validateAccess(context).then(
kafkaConnectService
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
String connectName,
String connectorName,
ServerWebExchange exchange) {
String connectName,
String connectorName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, ConnectAction.VIEW)
Expand All @@ -223,14 +222,14 @@ public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
return validateAccess(context).thenReturn(
ResponseEntity
.ok(kafkaConnectService
.getConnectorTasks(getCluster(clusterName), connectName, connectorName))
).doOnEach(sig -> audit(context, sig));
.getConnectorTasks(getCluster(clusterName), connectName, connectorName)))
.doOnEach(sig -> audit(context, sig));
}

@Override
public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, String connectName,
String connectorName, Integer taskId,
ServerWebExchange exchange) {
String connectorName, Integer taskId,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
Expand All @@ -242,8 +241,8 @@ public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, Strin
return validateAccess(context).then(
kafkaConnectService
.restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -259,8 +258,8 @@ public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
return validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
).doOnEach(sig -> audit(context, sig));
kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName))))
.doOnEach(sig -> audit(context, sig));
}

@Override
Expand All @@ -285,4 +284,26 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
default -> defaultComparator;
};
}

@Override
public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, String connectName,
Dugong42 marked this conversation as resolved.
Show resolved Hide resolved
String connectorName,
ServerWebExchange exchange) {
ConnectAction[] connectActions;

connectActions = new ConnectAction[] { ConnectAction.VIEW, ConnectAction.RESET_OFFSETS };

var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, connectActions)
Dugong42 marked this conversation as resolved.
Show resolved Hide resolved
.operationName("resetConnectorOffsets")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
kafkaConnectService
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ public enum ConnectAction implements PermissibleAction {
EDIT(VIEW),
CREATE(VIEW),
RESTART(VIEW),
DELETE(VIEW)
DELETE(VIEW),
RESET_OFFSETS(VIEW)

;

Expand All @@ -20,7 +21,7 @@ public enum ConnectAction implements PermissibleAction {
this.dependantActions = dependantActions;
}

public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART);
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART, RESET_OFFSETS);

@Nullable
public static ConnectAction fromString(String name) {
Expand Down
Loading
Loading