Skip to content

Commit

Permalink
Merge branch 'main' into pnpm-9.15.4
Browse files Browse the repository at this point in the history
  • Loading branch information
yeikel authored Jan 14, 2025
2 parents 51bbbfb + a159ef6 commit e02f987
Show file tree
Hide file tree
Showing 52 changed files with 555 additions and 171 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/frontend_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Install node
uses: actions/[email protected]
with:
node-version: "18.17.1"
node-version: "22.12.0"
cache: "pnpm"
cache-dependency-path: "./frontend/pnpm-lock.yaml"

Expand Down
1 change: 1 addition & 0 deletions .mvn/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-Djava.net.useSystemProxies=true
1 change: 1 addition & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@
</goals>
<configuration>
<arguments>build</arguments>
<pnpmInheritsProxyConfigFromMaven>false</pnpmInheritsProxyConfigFromMaven>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,16 @@ public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorNam
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Void> stopConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.stopConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> stopConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.stopConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
throws WebClientResponseException {
Expand All @@ -261,6 +271,18 @@ public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connec
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
}

@Override
public Mono<Void> resetConnectorOffsets(String connectorName)
throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.resetConnectorOffsets(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> resetConnectorOffsetsWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflictOrRebalance(super.resetConnectorOffsetsWithHttpInfo(connectorName));
}

@Override
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
return withRetryOnRebalance(super.resumeConnector(connectorName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;

import io.kafbat.ui.api.KafkaConnectApi;
import io.kafbat.ui.model.ConnectDTO;
Expand Down Expand Up @@ -285,4 +287,23 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
default -> defaultComparator;
};
}

@Override
public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, String connectName,
String connectorName,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, VIEW, RESET_OFFSETS)
.operationName("resetConnectorOffsets")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();

return validateAccess(context).then(
kafkaConnectService
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.kafbat.ui.exception;

public class ConnectorOffsetsResetException extends CustomBaseException {

public ConnectorOffsetsResetException(String message) {
super(message);
}

@Override
public ErrorCode getErrorCode() {
return ErrorCode.CONNECTOR_OFFSETS_RESET_ERROR;
}
}
1 change: 1 addition & 0 deletions api/src/main/java/io/kafbat/ui/exception/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum ErrorCode {
TOPIC_ANALYSIS_ERROR(4018, HttpStatus.BAD_REQUEST),
FILE_UPLOAD_EXCEPTION(4019, HttpStatus.INTERNAL_SERVER_ERROR),
CEL_ERROR(4020, HttpStatus.BAD_REQUEST),
CONNECTOR_OFFSETS_RESET_ERROR(4021, HttpStatus.BAD_REQUEST),
;

static {
Expand Down
1 change: 0 additions & 1 deletion api/src/main/java/io/kafbat/ui/model/BrokerMetrics.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.model;

import io.kafbat.ui.model.MetricDTO;
import java.util.List;
import lombok.Builder;
import lombok.Data;
Expand Down
12 changes: 2 additions & 10 deletions api/src/main/java/io/kafbat/ui/model/CleanupPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,10 @@ public enum CleanupPolicy {
this.policies = policies;
}

public String getPolicy() {
return policies.get(0);
}

public static CleanupPolicy fromString(String string) {
return Arrays.stream(CleanupPolicy.values())
.filter(v ->
v.policies.stream().anyMatch(
s -> s.equals(string.replace(" ", "")
)
)
).findFirst()
.filter(v -> v.policies.stream().anyMatch(s -> s.equals(string.replace(" ", ""))))
.findFirst()
.orElse(UNKNOWN);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.model;


import java.util.List;
import lombok.Builder;
import lombok.Data;
Expand All @@ -17,13 +16,13 @@ public class InternalBrokerConfig {
private final List<ConfigEntry.ConfigSynonym> synonyms;

public static InternalBrokerConfig from(ConfigEntry configEntry, boolean readOnlyCluster) {
InternalBrokerConfig.InternalBrokerConfigBuilder builder = InternalBrokerConfig.builder()
return InternalBrokerConfig.builder()
.name(configEntry.name())
.value(configEntry.value())
.source(configEntry.source())
.isReadOnly(readOnlyCluster || configEntry.isReadOnly())
.isSensitive(configEntry.isSensitive())
.synonyms(configEntry.synonyms());
return builder.build();
.synonyms(configEntry.synonyms())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package io.kafbat.ui.model;

import io.kafbat.ui.model.MetricDTO;
import io.kafbat.ui.model.ServerStatusDTO;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Data;


@Data
@Builder(toBuilder = true)
public class InternalClusterMetrics {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.kafbat.ui.model;

import com.google.common.base.Throwables;
import io.kafbat.ui.model.BrokerDiskUsageDTO;
import io.kafbat.ui.model.MetricsCollectionErrorDTO;
import io.kafbat.ui.model.ServerStatusDTO;
import java.math.BigDecimal;
import java.util.List;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,4 @@ public class InternalPartition {
private final Long segmentSize;
private final Integer segmentCount;


}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import lombok.Value;
import org.apache.kafka.common.TopicPartition;


public class InternalPartitionsOffsets {

@Value
Expand Down
13 changes: 0 additions & 13 deletions api/src/main/java/io/kafbat/ui/model/InternalSegmentSizeDto.java

This file was deleted.

3 changes: 0 additions & 3 deletions api/src/main/java/io/kafbat/ui/model/InternalTopic.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.model;

import io.kafbat.ui.config.ClustersProperties;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
Expand All @@ -16,8 +15,6 @@
@Builder(toBuilder = true)
public class InternalTopic {

ClustersProperties clustersProperties;

// from TopicDescription
private final String name;
private final boolean internal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import lombok.Data;
import org.apache.kafka.clients.admin.ConfigEntry;


@Data
@Builder
public class InternalTopicConfig {
Expand Down
1 change: 0 additions & 1 deletion api/src/main/java/io/kafbat/ui/model/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import lombok.Builder;
import lombok.Value;


@Builder
@Value
public class Metrics {
Expand Down
1 change: 0 additions & 1 deletion api/src/main/java/io/kafbat/ui/model/Statistics.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.model;

import io.kafbat.ui.model.ServerStatusDTO;
import io.kafbat.ui.service.ReactiveAdminClient;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ public enum ConnectAction implements PermissibleAction {
EDIT(VIEW),
CREATE(VIEW),
RESTART(VIEW),
DELETE(VIEW)
DELETE(VIEW),
RESET_OFFSETS(VIEW)

;

Expand All @@ -20,7 +21,7 @@ public enum ConnectAction implements PermissibleAction {
this.dependantActions = dependantActions;
}

public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART);
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART, RESET_OFFSETS);

@Nullable
public static ConnectAction fromString(String name) {
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

18 changes: 18 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
import io.kafbat.ui.connect.model.ConnectorTopics;
import io.kafbat.ui.connect.model.TaskStatus;
import io.kafbat.ui.exception.ConnectorOffsetsResetException;
import io.kafbat.ui.exception.NotFoundException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.mapper.ClusterMapper;
Expand Down Expand Up @@ -213,6 +214,7 @@ public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName,
case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName,
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
case PAUSE -> client.pauseConnector(connectorName);
case STOP -> client.stopConnector(connectorName);
case RESUME -> client.resumeConnector(connectorName);
});
}
Expand Down Expand Up @@ -272,4 +274,20 @@ private ReactiveFailover<KafkaConnectClientApi> api(KafkaCluster cluster, String
}
return client;
}

public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName,
String connectorName) {
return api(cluster, connectName)
.mono(client -> client.resetConnectorOffsets(connectorName))
.onErrorResume(WebClientResponseException.NotFound.class,
e -> {
throw new NotFoundException("Connector %s not found in %s".formatted(connectorName, connectName));
})
.onErrorResume(WebClientResponseException.BadRequest.class,
e -> {
throw new ConnectorOffsetsResetException(
"Failed to reset offsets of connector %s of %s. Make sure it is STOPPED first."
.formatted(connectorName, connectName));
});
}
}
Loading

0 comments on commit e02f987

Please sign in to comment.