Skip to content

Commit

Permalink
CB-22003 Extends the Datalake horizontal scaling flow with the node s…
Browse files Browse the repository at this point in the history
…tatus validation. We cannot execute the horizontal scaling if the CM is not reachable. Validations: check, the stack status, Stack has to available. Check the CM status, CM has to reachable. The target host group is scalable. Do not allow the downscale for: solr, kafka and storage add on host groups. Validate the desired count does not hurt the minimum EDL setup"
  • Loading branch information
RokolyA94 authored and keyki committed Jul 12, 2023
1 parent e69658b commit c53c3e3
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public enum DatalakeInstanceGroupScalingDetails {
IDBROKER("idbroker", false, 2),
GATEWAY("gateway", false, 2),
AUXILIARY("auxiliary", false, 1),
CORE("core", true, 3),
CORE("core", false, 3),
SOLRHG("solrhg", true, 0),
STORAGEHG("storagehg", true, 0),
KAFKAHG("kafkahg", true, 0),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
package com.sequenceiq.datalake.service.sdx;

import static com.sequenceiq.cloudbreak.common.exception.NotFoundException.notFound;
import static com.sequenceiq.cloudbreak.common.mappable.CloudPlatform.YARN;
import static com.sequenceiq.sdx.api.model.SdxClusterShape.ENTERPRISE;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import javax.inject.Inject;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.WebApplicationException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.StackV4Endpoint;
import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.base.InstanceGroupV4Base;
import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.base.InstanceMetadataType;
import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.base.InstanceStatus;
import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.request.StackScaleV4Request;
import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.response.StackV4Response;
import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.response.instancegroup.InstanceGroupV4Response;
import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.response.instancegroup.instancemetadata.InstanceMetaDataV4Response;
import com.sequenceiq.cloudbreak.auth.ThreadBasedUserCrnProvider;
import com.sequenceiq.cloudbreak.auth.altus.EntitlementService;
import com.sequenceiq.cloudbreak.auth.crn.RegionAwareInternalCrnGeneratorFactory;
Expand Down Expand Up @@ -64,13 +72,29 @@ public class SdxHorizontalScalingService {
@Inject
private CloudbreakFlowService cloudbreakFlowService;

private static Map<String, Set<InstanceMetaDataV4Response>> getInstanceMap(StackV4Response stack, String target, String gatewayInstanceName) {
Map<String, Set<InstanceMetaDataV4Response>> instanceMap = stack.getInstanceGroups().stream()
.filter(instance -> instance.getName().equalsIgnoreCase(target) || instance.getName().equalsIgnoreCase(gatewayInstanceName))
.collect(Collectors.toMap(instance -> instance.getName().toLowerCase(Locale.ROOT), InstanceGroupV4Response::getMetadata));
return instanceMap;
}

private List<InstanceMetaDataV4Response> getGatewayUnhealthyMetadata(String gatewayInstanceName, Map<String, Set<InstanceMetaDataV4Response>> instanceMap) {
List<InstanceMetaDataV4Response> gatewayMetaData = instanceMap.get(gatewayInstanceName).stream()
.filter(data -> data.getInstanceType() == InstanceMetadataType.GATEWAY_PRIMARY &&
(!data.getInstanceStatus().equals(InstanceStatus.SERVICES_HEALTHY)
|| !data.getInstanceStatus().equals(InstanceStatus.SERVICES_RUNNING)))
.toList();
return gatewayMetaData;
}

public FlowIdentifier horizontalScaleDatalake(String name, DatalakeHorizontalScaleRequest scaleRequest) {
String accountId = ThreadBasedUserCrnProvider.getAccountId();
if (!entitlementService.isDatalakeHorizontalScaleEnabled(accountId)) {
throw new BadRequestException(String.format("Datalake horizontal scale is not enabled for account %s.", accountId));
throw new BadRequestException(String.format("Data lake horizontal scale is not enabled for account %s.", accountId));
}

LOGGER.info("Horizontal scaling Datalake with name {}", name);
LOGGER.info("Horizontal scaling Data lake with name {}", name);
SdxCluster sdxCluster = sdxClusterRepository.findByAccountIdAndClusterNameAndDeletedIsNullAndDetachedIsFalse(accountId, name)
.orElseThrow(() -> notFound("SDX cluster", name).get());
return sdxReactorFlowManager.triggerHorizontalScaleDataLake(sdxCluster, scaleRequest);
Expand Down Expand Up @@ -99,46 +123,87 @@ public String triggerScalingFlow(SdxCluster sdxCluster, StackScaleV4Request scal
}

public void validateHorizontalScaleRequest(SdxCluster sdxCluster, DatalakeHorizontalScaleRequest request) {
if (!ENTERPRISE.equals(sdxCluster.getClusterShape())) {
throw new BadRequestException(String.format("scaling not supported on: %s. " +
"Only the ENTERPRISE Datalake shape support the horizontal scaling functionality.", sdxCluster.getClusterShape().name()));
if (ENTERPRISE != sdxCluster.getClusterShape()) {
throw new BadRequestException(String.format("Horizontal scaling not supported on: %s. Please use ENTERPRISE Data lake shape",
sdxCluster.getClusterShape().name()));
}
String hostGroup = request.getGroup();
if (!DatalakeInstanceGroupScalingDetails.valueOf(hostGroup.toUpperCase(Locale.ROOT)).isScalable()) {
throw new BadRequestException(String.format("%s host group cannot scale", hostGroup));
throw new BadRequestException(String.format("%s hostgroup cannot be scaled", hostGroup));
}
DetailedEnvironmentResponse environmentResponse = environmentClientService.getByName(sdxCluster.getEnvName());
if (!EnvironmentStatus.upscalable().contains(environmentResponse.getEnvironmentStatus())) {
throw new BadRequestException(String.format("The DAtalake cannot be scaled because the environment %s is in the %s status.",
sdxCluster.getEnvName(), environmentResponse.getEnvironmentStatus().name()));
throw new BadRequestException(String.format("The Data lake cannot be scaled in the %s environment status.",
environmentResponse.getEnvironmentStatus().name()));
}
StackV4Response stackResponse = stackV4Endpoint.getByCrn(1L, sdxCluster.getStackCrn(), Set.of());
String nodeRequestValidity = isNodeRequestValid(stackResponse, request);
if (StringUtils.isNotEmpty(nodeRequestValidity)) {
throw new BadRequestException(String.format("The requested nodeCount does not reach the minimum condition of horizontal scaling. Issues: %s",
nodeRequestValidity));
if (YARN == stackResponse.getCloudPlatform()) {
throw new BadRequestException("The Data lake horizontal scaling is not supported on YCloud");
}
Optional<String> nodeRequestValid = validateNodeRequest(stackResponse, request);
if (nodeRequestValid.isPresent()) {
throw new BadRequestException(nodeRequestValid.get());
}
}

private String isNodeRequestValid(StackV4Response stack, DatalakeHorizontalScaleRequest request) {
private Optional<String> validateNodeRequest(StackV4Response stack, DatalakeHorizontalScaleRequest request) {
if (request.getDesiredCount() < 0) {
LOGGER.warn("Negative nodeCount was requested.");
return "Negative nodeCount is not accepted. If you want to downsacle your cluster please use lower number than the acutal node count";
return Optional.of("Negative nodeCount is not accepted. If you want to downscale your Data lake please " +
"use lower number than the actual node count");
}
String target = request.getGroup();
boolean hostGroupNotExists = stack.getInstanceGroups().stream()
.map(InstanceGroupV4Response::getName)
.noneMatch(target::equalsIgnoreCase);
if (hostGroupNotExists) {
String gatewayInstanceName = DatalakeInstanceGroupScalingDetails.GATEWAY.getName().toLowerCase(Locale.ROOT);
Map<String, Set<InstanceMetaDataV4Response>> instanceMap = getInstanceMap(stack, target, gatewayInstanceName);
if (!instanceMap.containsKey(target)) {
LOGGER.warn("The requested hostgroup name not found!");
return String.format("The requested hostgroup name not found! The requested host group name is %s", target);
return Optional.of(String.format("The requested hostgroup name not found! The requested hostgroup name is %s", target));
}
if (!instanceMap.containsKey(gatewayInstanceName)) {
LOGGER.error("Gateway node is not found in the Stack Instance Map. The horizontal scale cannot be performed while the CM is not " +
"available StackId: {}", stack.getId());
List<String> instanceGroupList = stack.getInstanceGroups().stream().map(InstanceGroupV4Base::getName).toList();
return Optional.of(String.format("The Gateway instance not found. The horizontal scale cannot be performed while the CM is not reachable" +
", found instances: %s", instanceGroupList));
}
List<InstanceMetaDataV4Response> gatewayMetaData = getGatewayUnhealthyMetadata(gatewayInstanceName, instanceMap);
if (!gatewayMetaData.isEmpty()) {
LOGGER.warn("Gateway(CM node) is not healthy. Can not start horizontal scaling");
List<InstanceStatus> instanceStatuses = gatewayMetaData.stream().map(InstanceMetaDataV4Response::getInstanceStatus).toList();
return Optional.of(String.format("Gateway instances are not healthy. Please repair the node and retry horizontal" +
" scaling Actual Gateway statuses: %s", instanceStatuses));
}
DatalakeInstanceGroupScalingDetails instanceGroupName = DatalakeInstanceGroupScalingDetails.valueOf(request.getGroup().toUpperCase(Locale.ROOT));
if (instanceGroupName.getMinimumNodeCount() > request.getDesiredCount()) {
DatalakeInstanceGroupScalingDetails targetInstanceGroupName = DatalakeInstanceGroupScalingDetails.valueOf(request.getGroup().toUpperCase(Locale.ROOT));
if (targetInstanceGroupName.getMinimumNodeCount() > request.getDesiredCount()) {
LOGGER.warn("Requested nodeCount is less than the minimum nodeCount.");
return String.format("Requested nodeCount is less than the minimum nodeCount. The minimum node cont for %s is %d",
instanceGroupName.name(), instanceGroupName.getMinimumNodeCount());
return Optional.of(String.format("Requested node count is less than the minimum node count. The minimum node count for %s is %d",
targetInstanceGroupName.name(), targetInstanceGroupName.getMinimumNodeCount()));
}
return "";
if (isDownscaleBlocked(targetInstanceGroupName) && isDownscale(stack, request)) {
return Optional.of("The storage hostgroup down scale is not supported, because it can cause data loss");
}
return Optional.empty();
}

private boolean isDownscaleBlocked(DatalakeInstanceGroupScalingDetails targetInstanceGroupName) {
return switch (targetInstanceGroupName) {
case STORAGEHG, KAFKAHG, SOLRHG -> false;
default -> true;
};
}

private boolean isInstanceRunning(InstanceStatus status) {
return switch (status) {
case CREATED, SERVICES_RUNNING, SERVICES_HEALTHY, DECOMMISSIONED, DECOMMISSION_FAILED -> true;
default -> false;
};
}

private boolean isDownscale(StackV4Response stack, DatalakeHorizontalScaleRequest request) {
return stack.getInstanceGroups().stream()
.filter(instance -> instance.getName().equals(request.getGroup().toLowerCase(Locale.ROOT)))
.map(InstanceGroupV4Base::getNodeCount)
.mapToInt(Integer::intValue)
.sum() > request.getDesiredCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import com.sequenceiq.cloudbreak.api.endpoint.v4.common.Status;
import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.StackV4Endpoint;
import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.request.StackScaleV4Request;
import com.sequenceiq.cloudbreak.api.endpoint.v4.stacks.response.StackV4Response;
Expand Down Expand Up @@ -114,9 +115,10 @@ void testDatalakeHorizontalScaleValidation() {
scaleRequest.setDesiredCount(0);
assertThrows(BadRequestException.class, () -> underTest.validateHorizontalScaleRequest(sdxCluster, scaleRequest));
scaleRequest.setGroup("core");
scaleRequest.setDesiredCount(1);
scaleRequest.setDesiredCount(3);
assertThrows(BadRequestException.class, () -> underTest.validateHorizontalScaleRequest(sdxCluster, scaleRequest));
scaleRequest.setDesiredCount(4);
scaleRequest.setGroup("solrhg");
scaleRequest.setDesiredCount(3);
underTest.validateHorizontalScaleRequest(sdxCluster, scaleRequest);
}

Expand All @@ -136,7 +138,14 @@ private StackV4Response getStackV4Response() {
InstanceGroupV4Response auxiliary = new InstanceGroupV4Response();
auxiliary.setName("auxiliary");
auxiliary.setMinimumNodeCount(1);
stackV4Response.setInstanceGroups(List.of(core, auxiliary));
InstanceGroupV4Response gateway = new InstanceGroupV4Response();
gateway.setName("gateway");
gateway.setMinimumNodeCount(2);
InstanceGroupV4Response solrhg = new InstanceGroupV4Response();
solrhg.setName("solrhg");
solrhg.setMinimumNodeCount(0);
stackV4Response.setInstanceGroups(List.of(core, auxiliary, gateway, solrhg));
stackV4Response.setStatus(Status.AVAILABLE);
return stackV4Response;
}

Expand Down

0 comments on commit c53c3e3

Please sign in to comment.