Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimize]Connect 提交任务变更为只保存用户修改的配置,并修复JSON模式下配置展示不全(#1047) #1158

Merged
merged 7 commits into from
Oct 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterBrokersOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO;

Expand All @@ -23,5 +22,5 @@ public interface ClusterBrokersManager {
* @param clusterPhyId 物理集群 id
* @return 返回根据物理集群id获取到的集群对应broker状态信息
*/
Result<ClusterBrokersStateVO> getClusterPhyBrokersState(Long clusterPhyId);
ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterRunStateEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
Expand All @@ -28,7 +26,6 @@
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerConfigService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
Expand Down Expand Up @@ -63,9 +60,6 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
@Autowired
private KafkaJMXClient kafkaJMXClient;

@Autowired
private ClusterPhyService clusterPhyService;

@Override
public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto) {
// 获取集群Broker列表
Expand Down Expand Up @@ -114,12 +108,7 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(L
}

@Override
public Result<ClusterBrokersStateVO> getClusterPhyBrokersState(Long clusterPhyId) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
}

public ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId) {
ClusterBrokersStateVO clusterBrokersStateVO = new ClusterBrokersStateVO();

// 获取集群Broker列表
Expand All @@ -137,25 +126,24 @@ public Result<ClusterBrokersStateVO> getClusterPhyBrokersState(Long clusterPhyId
);

// 获取controller信息
Result<KafkaController> controllerResult = kafkaControllerService.getControllerFromKafka(clusterPhy);
KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId);

// 设置kafka-controller信息
clusterBrokersStateVO.setKafkaControllerAlive(false);
if(null != controllerResult.getData()) {
if(null != kafkaController) {
clusterBrokersStateVO.setKafkaController(
this.convert2KafkaControllerVO(
controllerResult.getData(),
brokerService.getBroker(clusterPhyId, controllerResult.getData().getBrokerId())
kafkaController,
brokerService.getBroker(clusterPhyId, kafkaController.getBrokerId())
)
);
clusterBrokersStateVO.setKafkaControllerAlive(true);
}

clusterBrokersStateVO.setConfigSimilar(
brokerConfigService.countBrokerConfigDiffsFromDB(clusterPhyId, KafkaConstant.CONFIG_SIMILAR_IGNORED_CONFIG_KEY_LIST) <= 0
clusterBrokersStateVO.setConfigSimilar(brokerConfigService.countBrokerConfigDiffsFromDB(clusterPhyId, KafkaConstant.CONFIG_SIMILAR_IGNORED_CONFIG_KEY_LIST) <= 0
);

return Result.buildSuc(clusterBrokersStateVO);
return clusterBrokersStateVO;
}

/**************************************************** private method ****************************************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ const babelOptions = {
cacheDirectory: true,
babelrc: false,
presets: [require.resolve('@babel/preset-env'), require.resolve('@babel/preset-typescript'), require.resolve('@babel/preset-react')],
overrides: [
// TODO:编译时需要做的事情更多,应该只针对目标第三方库
{
include: './node_modules',
sourceType: 'unambiguous'
}
],
plugins: [
[require.resolve('@babel/plugin-proposal-decorators'), { legacy: true }],
[require.resolve('@babel/plugin-proposal-class-properties'), { loose: true }],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ export enum ClustersPermissionMap {
CONNECTOR_DELETE = 'Connector-删除',
CONNECTOR_RESTART = 'Connector-重启',
CONNECTOR_STOP_RESUME = 'Connector-暂停&恢复',
// Security
SECURITY_ACL_ADD = 'Security-ACL新增',
SECURITY_ACL_DELETE = 'Security-ACL删除',
SECURITY_USER_ADD = 'Security-User新增',
SECURITY_USER_DELETE = 'Security-User删除',
SECURITY_USER_EDIT_PASSWORD = 'Security-User修改密码',
}

export interface PermissionNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,14 @@ const StepFormFirst = (props: SubFormProps) => {
const result: FormConnectorConfigs = {
pluginConfig: {},
};

// 获取一份默认配置
const defaultPluginConfig: any = {};

pluginConfig.configs.forEach(({ definition }) => {
// 获取一份默认配置
defaultPluginConfig[definition.name] = definition?.defaultValue;

if (!getExistFormItems(pluginType).includes(definition.name)) {
const pluginConfigs = result.pluginConfig;
const group = definition.group || 'Others';
Expand All @@ -205,7 +212,7 @@ const StepFormFirst = (props: SubFormProps) => {

Object.keys(result).length &&
form.setFieldsValue({
configs: result,
configs: { ...result, defaultPluginConfig, editConnectorConfig: result.connectorConfig },
});
})
.finally(() => props.setSubmitLoading(false));
Expand Down Expand Up @@ -957,6 +964,7 @@ export default forwardRef(
}) => void
) => {
const promises: Promise<any>[] = [];
const compareConfig = stepsFormRef.current[0].getFieldValue('configs'); // 获取步骤一的form信息
Object.values(stepsFormRef.current).forEach((form, i) => {
const promise = form
.validateFields()
Expand Down Expand Up @@ -987,11 +995,22 @@ export default forwardRef(
const [k, ...v] = l.split('=');
result[k] = v.join('=');
});

const editConnectorConfig = operateInfo.type === 'edit' ? compareConfig.editConnectorConfig : {}; // 编辑状态时拿到config配置
const newCompareConfig = { ...compareConfig.defaultPluginConfig, ...editConnectorConfig, ...result }; // 整合后的表单提交信息
Object.keys(newCompareConfig).forEach((item) => {
if (
newCompareConfig[item] === compareConfig.defaultPluginConfig[item] ||
newCompareConfig[item]?.toString() === compareConfig.defaultPluginConfig[item]?.toString()
) {
delete newCompareConfig[item]; // 清除默认值
}
});
callback({
success: {
connectClusterId: res[0].connectClusterId,
connectorName: result['name'],
config: result,
config: newCompareConfig,
},
});
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const PLACEHOLDER = `配置格式如下

{
"connectClusterName": "", // Connect Cluster 名称
"config": { // 具体配置项
"configs": { // 具体配置项
"name": "",
"connector.class": "",
"tasks.max": 1,
Expand Down Expand Up @@ -47,7 +47,7 @@ export default forwardRef((props: any, ref) => {
configs: JSON.stringify(
{
connectClusterName,
config: defaultConfigs,
configs: defaultConfigs,
},
null,
2
Expand All @@ -63,13 +63,13 @@ export default forwardRef((props: any, ref) => {
form.validateFields().then(
(data) => {
const postData = JSON.parse(data.configs);
postData.connectorName = postData.config.name;
postData.connectorName = postData.configs.name;
postData.connectClusterId = connectClusters.find((cluster) => cluster.label === postData.connectClusterName).value;
delete postData.connectClusterName;

Object.entries(postData.config).forEach(([key, val]) => {
Object.entries(postData.configs).forEach(([key, val]) => {
if (val === null) {
delete postData.config[key];
delete postData.configs[key];
}
});
Utils.put(api.validateConnectorConfig, postData).then(
Expand Down Expand Up @@ -198,34 +198,34 @@ export default forwardRef((props: any, ref) => {
}
}

if (!v.config || typeof v.config !== 'object') {
return Promise.reject('内容缺少 config 字段或字段格式错误');
if (!v.configs || typeof v.configs !== 'object') {
return Promise.reject('内容缺少 configs 字段或字段格式错误');
} else {
// 校验 connectorName 字段
if (!v.config.name) {
return Promise.reject('config 字段下缺少 name 项');
if (!v.configs.name) {
return Promise.reject('configs 字段下缺少 name 项');
} else {
if (type === 'edit' && v.config.name !== defaultConfigs.name) {
if (type === 'edit' && v.configs.name !== defaultConfigs.name) {
return Promise.reject('编辑模式下不允许修改 name 字段');
}
}
if (!v.config['connector.class']) {
return Promise.reject('config 字段下缺少 connector.class 项');
} else if (type === 'edit' && v.config['connector.class'] !== defaultConfigs['connector.class']) {
if (!v.configs['connector.class']) {
return Promise.reject('configs 字段下缺少 connector.class 项');
} else if (type === 'edit' && v.configs['connector.class'] !== defaultConfigs['connector.class']) {
return Promise.reject('编辑模式下不允许修改 connector.class 字段');
}
}

if (type === 'create') {
// 异步校验 connector 名称是否重复 以及 className 是否存在
return Promise.all([
Utils.request(api.isConnectorExist(connectClusterId, v.config.name)),
Utils.request(api.isConnectorExist(connectClusterId, v.configs.name)),
Utils.request(api.getConnectorPlugins(connectClusterId)),
]).then(
([data, plugins]: [any, ConnectorPlugin[]]) => {
return data?.exist
? Promise.reject('name 与已有 Connector 重复')
: plugins.every((plugin) => plugin.className !== v.config['connector.class'])
: plugins.every((plugin) => plugin.className !== v.configs['connector.class'])
? Promise.reject('该 connectCluster 下不存在 connector.class 项配置的插件')
: Promise.resolve();
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import AddACLDrawer, {
RESOURCE_TO_OPERATIONS_MAP,
RESOURCE_MAP_KEYS,
} from './EditDrawer';
import { ClustersPermissionMap } from '../CommonConfig';
import './index.less';

const { confirm } = Modal;
Expand Down Expand Up @@ -106,7 +105,7 @@ const SecurityACLs = (): JSX.Element => {
};

const columns = () => {
const baseColumns: any = [
const baseColumns = [
{
title: 'Principal',
dataIndex: 'kafkaUser',
Expand Down Expand Up @@ -144,9 +143,7 @@ const SecurityACLs = (): JSX.Element => {
title: 'Host',
dataIndex: 'aclClientHost',
},
];
if (global.hasPermission && global.hasPermission(ClustersPermissionMap.SECURITY_ACL_DELETE)) {
baseColumns.push({
{
title: '操作',
dataIndex: '',
width: 120,
Expand All @@ -159,8 +156,8 @@ const SecurityACLs = (): JSX.Element => {
</>
);
},
});
}
},
];

return baseColumns;
};
Expand Down Expand Up @@ -241,19 +238,15 @@ const SecurityACLs = (): JSX.Element => {
</Form.Item>
</Form>
</div>
{global.hasPermission && global.hasPermission(ClustersPermissionMap.SECURITY_ACL_ADD) ? (
<div className={`${tableHeaderPrefix}-right`}>
<Button
type="primary"
// icon={<PlusOutlined />}
onClick={() => editDrawerRef.current.onOpen(true, getACLs)}
>
新增ACL
</Button>
</div>
) : (
<></>
)}
<div className={`${tableHeaderPrefix}-right`}>
<Button
type="primary"
// icon={<PlusOutlined />}
onClick={() => editDrawerRef.current.onOpen(true, getACLs)}
>
新增ACL
</Button>
</div>
</div>
<ProTable
tableProps={{
Expand Down
Loading
Loading