Skip to content

Commit

Permalink
[Optimize]优化集群Brokers中, Controller显示存在延迟的问题 (#1162)
Browse files Browse the repository at this point in the history
优化方式:
从DB获取调整为从Kafka中实时获取。
  • Loading branch information
ZQKC authored Sep 27, 2023
1 parent e2ad3af commit 3b72f73
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterBrokersOverviewDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO;

Expand All @@ -22,5 +23,5 @@ public interface ClusterBrokersManager {
* @param clusterPhyId 物理集群 id
* @return 返回根据物理集群id获取到的集群对应broker状态信息
*/
ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId);
Result<ClusterBrokersStateVO> getClusterPhyBrokersState(Long clusterPhyId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterRunStateEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
Expand All @@ -26,6 +28,7 @@
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerConfigService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
Expand Down Expand Up @@ -60,6 +63,9 @@ public class ClusterBrokersManagerImpl implements ClusterBrokersManager {
@Autowired
private KafkaJMXClient kafkaJMXClient;

@Autowired
private ClusterPhyService clusterPhyService;

@Override
public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto) {
// 获取集群Broker列表
Expand Down Expand Up @@ -108,7 +114,12 @@ public PaginationResult<ClusterBrokersOverviewVO> getClusterPhyBrokersOverview(L
}

@Override
public ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId) {
public Result<ClusterBrokersStateVO> getClusterPhyBrokersState(Long clusterPhyId) {
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(clusterPhyId);
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
}

ClusterBrokersStateVO clusterBrokersStateVO = new ClusterBrokersStateVO();

// 获取集群Broker列表
Expand All @@ -126,24 +137,25 @@ public ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId) {
);

// 获取controller信息
KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId);
Result<KafkaController> controllerResult = kafkaControllerService.getControllerFromKafka(clusterPhy);

// 设置kafka-controller信息
clusterBrokersStateVO.setKafkaControllerAlive(false);
if(null != kafkaController) {
if(null != controllerResult.getData()) {
clusterBrokersStateVO.setKafkaController(
this.convert2KafkaControllerVO(
kafkaController,
brokerService.getBroker(clusterPhyId, kafkaController.getBrokerId())
controllerResult.getData(),
brokerService.getBroker(clusterPhyId, controllerResult.getData().getBrokerId())
)
);
clusterBrokersStateVO.setKafkaControllerAlive(true);
}

clusterBrokersStateVO.setConfigSimilar(brokerConfigService.countBrokerConfigDiffsFromDB(clusterPhyId, KafkaConstant.CONFIG_SIMILAR_IGNORED_CONFIG_KEY_LIST) <= 0
clusterBrokersStateVO.setConfigSimilar(
brokerConfigService.countBrokerConfigDiffsFromDB(clusterPhyId, KafkaConstant.CONFIG_SIMILAR_IGNORED_CONFIG_KEY_LIST) <= 0
);

return clusterBrokersStateVO;
return Result.buildSuc(clusterBrokersStateVO);
}

/**************************************************** private method ****************************************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Result<List<BrokerMetadataVO>> getClusterPhyBrokersMetadata(@PathVariable
@GetMapping(value = "clusters/{clusterPhyId}/brokers-state")
@ResponseBody
public Result<ClusterBrokersStateVO> getClusterPhyBrokersState(@PathVariable Long clusterPhyId) {
return Result.buildSuc(clusterBrokersManager.getClusterPhyBrokersState(clusterPhyId));
return clusterBrokersManager.getClusterPhyBrokersState(clusterPhyId);
}

@ApiOperation(value = "集群brokers信息列表")
Expand Down

0 comments on commit 3b72f73

Please sign in to comment.