From c53c3e35344afdaedade2348ce74081cf78809bc Mon Sep 17 00:00:00 2001 From: RokolyA94 Date: Mon, 10 Jul 2023 17:37:15 +0200 Subject: [PATCH] CB-22003 Extends the Datalake horizontal scaling flow with the node status validation. We cannot execute the horizontal scaling if the CM is not reachable. Validations: check, the stack status, Stack has to available. Check the CM status, CM has to reachable. The target host group is scalable. Do not allow the downscale for: solr, kafka and storage add on host groups. Validate the desired count does not hurt the minimum EDL setup" --- .../DatalakeInstanceGroupScalingDetails.java | 2 +- .../sdx/SdxHorizontalScalingService.java | 115 ++++++++++++++---- .../sdx/SdxHorizontalScalingServiceTest.java | 15 ++- 3 files changed, 103 insertions(+), 29 deletions(-) diff --git a/datalake/src/main/java/com/sequenceiq/datalake/entity/DatalakeInstanceGroupScalingDetails.java b/datalake/src/main/java/com/sequenceiq/datalake/entity/DatalakeInstanceGroupScalingDetails.java index a48e5fcf950..93bddfd22db 100644 --- a/datalake/src/main/java/com/sequenceiq/datalake/entity/DatalakeInstanceGroupScalingDetails.java +++ b/datalake/src/main/java/com/sequenceiq/datalake/entity/DatalakeInstanceGroupScalingDetails.java @@ -7,7 +7,7 @@ public enum DatalakeInstanceGroupScalingDetails { IDBROKER("idbroker", false, 2), GATEWAY("gateway", false, 2), AUXILIARY("auxiliary", false, 1), - CORE("core", true, 3), + CORE("core", false, 3), SOLRHG("solrhg", true, 0), STORAGEHG("storagehg", true, 0), KAFKAHG("kafkahg", true, 0), diff --git a/datalake/src/main/java/com/sequenceiq/datalake/service/sdx/SdxHorizontalScalingService.java b/datalake/src/main/java/com/sequenceiq/datalake/service/sdx/SdxHorizontalScalingService.java index 0101acc9780..f823c080d62 100644 --- a/datalake/src/main/java/com/sequenceiq/datalake/service/sdx/SdxHorizontalScalingService.java +++ b/datalake/src/main/java/com/sequenceiq/datalake/service/sdx/SdxHorizontalScalingService.java @@ -1,24 +1,32 @@ package com.sequenceiq.datalake.service.sdx; import static com.sequenceiq.cloudbreak.common.exception.NotFoundException.notFound; +import static com.sequenceiq.cloudbreak.common.mappable.CloudPlatform.YARN; import static com.sequenceiq.sdx.api.model.SdxClusterShape.ENTERPRISE; +import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import javax.inject.Inject; import javax.ws.rs.ProcessingException; import javax.ws.rs.WebApplicationException; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.StackV4Endpoint; +import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.base.InstanceGroupV4Base; +import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.base.InstanceMetadataType; +import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.base.InstanceStatus; import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.request.StackScaleV4Request; import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.response.StackV4Response; import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.response.instancegroup.InstanceGroupV4Response; +import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.response.instancegroup.instancemetadata.InstanceMetaDataV4Response; import com.sequenceiq.cloudbreak.auth.ThreadBasedUserCrnProvider; import com.sequenceiq.cloudbreak.auth.altus.EntitlementService; import com.sequenceiq.cloudbreak.auth.crn.RegionAwareInternalCrnGeneratorFactory; @@ -64,13 +72,29 @@ public class SdxHorizontalScalingService { @Inject private CloudbreakFlowService cloudbreakFlowService; + private static Map> getInstanceMap(StackV4Response stack, String target, String gatewayInstanceName) { + Map> instanceMap = stack.getInstanceGroups().stream() + .filter(instance -> instance.getName().equalsIgnoreCase(target) || instance.getName().equalsIgnoreCase(gatewayInstanceName)) + .collect(Collectors.toMap(instance -> instance.getName().toLowerCase(Locale.ROOT), InstanceGroupV4Response::getMetadata)); + return instanceMap; + } + + private List getGatewayUnhealthyMetadata(String gatewayInstanceName, Map> instanceMap) { + List gatewayMetaData = instanceMap.get(gatewayInstanceName).stream() + .filter(data -> data.getInstanceType() == InstanceMetadataType.GATEWAY_PRIMARY && + (!data.getInstanceStatus().equals(InstanceStatus.SERVICES_HEALTHY) + || !data.getInstanceStatus().equals(InstanceStatus.SERVICES_RUNNING))) + .toList(); + return gatewayMetaData; + } + public FlowIdentifier horizontalScaleDatalake(String name, DatalakeHorizontalScaleRequest scaleRequest) { String accountId = ThreadBasedUserCrnProvider.getAccountId(); if (!entitlementService.isDatalakeHorizontalScaleEnabled(accountId)) { - throw new BadRequestException(String.format("Datalake horizontal scale is not enabled for account %s.", accountId)); + throw new BadRequestException(String.format("Data lake horizontal scale is not enabled for account %s.", accountId)); } - LOGGER.info("Horizontal scaling Datalake with name {}", name); + LOGGER.info("Horizontal scaling Data lake with name {}", name); SdxCluster sdxCluster = sdxClusterRepository.findByAccountIdAndClusterNameAndDeletedIsNullAndDetachedIsFalse(accountId, name) .orElseThrow(() -> notFound("SDX cluster", name).get()); return sdxReactorFlowManager.triggerHorizontalScaleDataLake(sdxCluster, scaleRequest); @@ -99,46 +123,87 @@ public String triggerScalingFlow(SdxCluster sdxCluster, StackScaleV4Request scal } public void validateHorizontalScaleRequest(SdxCluster sdxCluster, DatalakeHorizontalScaleRequest request) { - if (!ENTERPRISE.equals(sdxCluster.getClusterShape())) { - throw new BadRequestException(String.format("scaling not supported on: %s. " + - "Only the ENTERPRISE Datalake shape support the horizontal scaling functionality.", sdxCluster.getClusterShape().name())); + if (ENTERPRISE != sdxCluster.getClusterShape()) { + throw new BadRequestException(String.format("Horizontal scaling not supported on: %s. Please use ENTERPRISE Data lake shape", + sdxCluster.getClusterShape().name())); } String hostGroup = request.getGroup(); if (!DatalakeInstanceGroupScalingDetails.valueOf(hostGroup.toUpperCase(Locale.ROOT)).isScalable()) { - throw new BadRequestException(String.format("%s host group cannot scale", hostGroup)); + throw new BadRequestException(String.format("%s hostgroup cannot be scaled", hostGroup)); } DetailedEnvironmentResponse environmentResponse = environmentClientService.getByName(sdxCluster.getEnvName()); if (!EnvironmentStatus.upscalable().contains(environmentResponse.getEnvironmentStatus())) { - throw new BadRequestException(String.format("The DAtalake cannot be scaled because the environment %s is in the %s status.", - sdxCluster.getEnvName(), environmentResponse.getEnvironmentStatus().name())); + throw new BadRequestException(String.format("The Data lake cannot be scaled in the %s environment status.", + environmentResponse.getEnvironmentStatus().name())); } StackV4Response stackResponse = stackV4Endpoint.getByCrn(1L, sdxCluster.getStackCrn(), Set.of()); - String nodeRequestValidity = isNodeRequestValid(stackResponse, request); - if (StringUtils.isNotEmpty(nodeRequestValidity)) { - throw new BadRequestException(String.format("The requested nodeCount does not reach the minimum condition of horizontal scaling. Issues: %s", - nodeRequestValidity)); + if (YARN == stackResponse.getCloudPlatform()) { + throw new BadRequestException("The Data lake horizontal scaling is not supported on YCloud"); + } + Optional nodeRequestValid = validateNodeRequest(stackResponse, request); + if (nodeRequestValid.isPresent()) { + throw new BadRequestException(nodeRequestValid.get()); } } - private String isNodeRequestValid(StackV4Response stack, DatalakeHorizontalScaleRequest request) { + private Optional validateNodeRequest(StackV4Response stack, DatalakeHorizontalScaleRequest request) { if (request.getDesiredCount() < 0) { LOGGER.warn("Negative nodeCount was requested."); - return "Negative nodeCount is not accepted. If you want to downsacle your cluster please use lower number than the acutal node count"; + return Optional.of("Negative nodeCount is not accepted. If you want to downscale your Data lake please " + + "use lower number than the actual node count"); } String target = request.getGroup(); - boolean hostGroupNotExists = stack.getInstanceGroups().stream() - .map(InstanceGroupV4Response::getName) - .noneMatch(target::equalsIgnoreCase); - if (hostGroupNotExists) { + String gatewayInstanceName = DatalakeInstanceGroupScalingDetails.GATEWAY.getName().toLowerCase(Locale.ROOT); + Map> instanceMap = getInstanceMap(stack, target, gatewayInstanceName); + if (!instanceMap.containsKey(target)) { LOGGER.warn("The requested hostgroup name not found!"); - return String.format("The requested hostgroup name not found! The requested host group name is %s", target); + return Optional.of(String.format("The requested hostgroup name not found! The requested hostgroup name is %s", target)); + } + if (!instanceMap.containsKey(gatewayInstanceName)) { + LOGGER.error("Gateway node is not found in the Stack Instance Map. The horizontal scale cannot be performed while the CM is not " + + "available StackId: {}", stack.getId()); + List instanceGroupList = stack.getInstanceGroups().stream().map(InstanceGroupV4Base::getName).toList(); + return Optional.of(String.format("The Gateway instance not found. The horizontal scale cannot be performed while the CM is not reachable" + + ", found instances: %s", instanceGroupList)); + } + List gatewayMetaData = getGatewayUnhealthyMetadata(gatewayInstanceName, instanceMap); + if (!gatewayMetaData.isEmpty()) { + LOGGER.warn("Gateway(CM node) is not healthy. Can not start horizontal scaling"); + List instanceStatuses = gatewayMetaData.stream().map(InstanceMetaDataV4Response::getInstanceStatus).toList(); + return Optional.of(String.format("Gateway instances are not healthy. Please repair the node and retry horizontal" + + " scaling Actual Gateway statuses: %s", instanceStatuses)); } - DatalakeInstanceGroupScalingDetails instanceGroupName = DatalakeInstanceGroupScalingDetails.valueOf(request.getGroup().toUpperCase(Locale.ROOT)); - if (instanceGroupName.getMinimumNodeCount() > request.getDesiredCount()) { + DatalakeInstanceGroupScalingDetails targetInstanceGroupName = DatalakeInstanceGroupScalingDetails.valueOf(request.getGroup().toUpperCase(Locale.ROOT)); + if (targetInstanceGroupName.getMinimumNodeCount() > request.getDesiredCount()) { LOGGER.warn("Requested nodeCount is less than the minimum nodeCount."); - return String.format("Requested nodeCount is less than the minimum nodeCount. The minimum node cont for %s is %d", - instanceGroupName.name(), instanceGroupName.getMinimumNodeCount()); + return Optional.of(String.format("Requested node count is less than the minimum node count. The minimum node count for %s is %d", + targetInstanceGroupName.name(), targetInstanceGroupName.getMinimumNodeCount())); } - return ""; + if (isDownscaleBlocked(targetInstanceGroupName) && isDownscale(stack, request)) { + return Optional.of("The storage hostgroup down scale is not supported, because it can cause data loss"); + } + return Optional.empty(); + } + + private boolean isDownscaleBlocked(DatalakeInstanceGroupScalingDetails targetInstanceGroupName) { + return switch (targetInstanceGroupName) { + case STORAGEHG, KAFKAHG, SOLRHG -> false; + default -> true; + }; + } + + private boolean isInstanceRunning(InstanceStatus status) { + return switch (status) { + case CREATED, SERVICES_RUNNING, SERVICES_HEALTHY, DECOMMISSIONED, DECOMMISSION_FAILED -> true; + default -> false; + }; + } + + private boolean isDownscale(StackV4Response stack, DatalakeHorizontalScaleRequest request) { + return stack.getInstanceGroups().stream() + .filter(instance -> instance.getName().equals(request.getGroup().toLowerCase(Locale.ROOT))) + .map(InstanceGroupV4Base::getNodeCount) + .mapToInt(Integer::intValue) + .sum() > request.getDesiredCount(); } } diff --git a/datalake/src/test/java/com/sequenceiq/datalake/service/sdx/SdxHorizontalScalingServiceTest.java b/datalake/src/test/java/com/sequenceiq/datalake/service/sdx/SdxHorizontalScalingServiceTest.java index 5a3bec417cd..99a6f2beea2 100644 --- a/datalake/src/test/java/com/sequenceiq/datalake/service/sdx/SdxHorizontalScalingServiceTest.java +++ b/datalake/src/test/java/com/sequenceiq/datalake/service/sdx/SdxHorizontalScalingServiceTest.java @@ -23,6 +23,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import com.sequenceiq.cloudbreak.api.endpoint.v4.common.Status; import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.StackV4Endpoint; import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.request.StackScaleV4Request; import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.response.StackV4Response; @@ -114,9 +115,10 @@ void testDatalakeHorizontalScaleValidation() { scaleRequest.setDesiredCount(0); assertThrows(BadRequestException.class, () -> underTest.validateHorizontalScaleRequest(sdxCluster, scaleRequest)); scaleRequest.setGroup("core"); - scaleRequest.setDesiredCount(1); + scaleRequest.setDesiredCount(3); assertThrows(BadRequestException.class, () -> underTest.validateHorizontalScaleRequest(sdxCluster, scaleRequest)); - scaleRequest.setDesiredCount(4); + scaleRequest.setGroup("solrhg"); + scaleRequest.setDesiredCount(3); underTest.validateHorizontalScaleRequest(sdxCluster, scaleRequest); } @@ -136,7 +138,14 @@ private StackV4Response getStackV4Response() { InstanceGroupV4Response auxiliary = new InstanceGroupV4Response(); auxiliary.setName("auxiliary"); auxiliary.setMinimumNodeCount(1); - stackV4Response.setInstanceGroups(List.of(core, auxiliary)); + InstanceGroupV4Response gateway = new InstanceGroupV4Response(); + gateway.setName("gateway"); + gateway.setMinimumNodeCount(2); + InstanceGroupV4Response solrhg = new InstanceGroupV4Response(); + solrhg.setName("solrhg"); + solrhg.setMinimumNodeCount(0); + stackV4Response.setInstanceGroups(List.of(core, auxiliary, gateway, solrhg)); + stackV4Response.setStatus(Status.AVAILABLE); return stackV4Response; }