Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-84852: C-EMA restart resilience for scan jobs #208

Merged
merged 20 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions service/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<spring-kafka.version>3.2.4</spring-kafka.version>
<kafka-clients.version>3.5.0</kafka-clients.version>
<jackson.version>2.16.1</jackson.version>
<awaitility.version>4.2.0</awaitility.version>
<dockerfile-maven.version>1.4.13</dockerfile-maven.version>
</properties>
<repositories>
Expand Down Expand Up @@ -290,6 +291,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>

<!-- for testing -->
<dependency>
Expand All @@ -298,12 +303,7 @@
<version>2.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>com.solacesystems</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class EventPortalProperties {
private Boolean managed = false;
private String incomingRequestQueueName;

private int waitAckScanCompleteTimeoutSec = 300;
private int waitAckScanCompletePollIntervalSec = 10;

private GatewayProperties gateway
= new GatewayProperties("standalone", "standalone", new GatewayMessagingProperties(true, false, List.of()));
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.solace.maas.ep.event.management.agent.repository.scan;

import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanStatusEntity;
import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanTypeEntity;
import org.springframework.data.repository.CrudRepository;

public interface ScanStatusRepository extends CrudRepository<ScanStatusEntity, String> {

ScanStatusEntity findByScanType(ScanTypeEntity scanType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;

import java.util.List;
import java.util.Optional;

@Repository
public interface ScanTypeRepository extends CrudRepository<ScanTypeEntity, String> {
Optional<ScanTypeEntity> findByNameAndScanId(String name, String scanId);
List<ScanTypeEntity> findAllByScanId(String scanId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public String scan(ScanRequestBO scanRequestBO) {
).stream()
.findFirst()
.orElseThrow())
.collect(Collectors.toUnmodifiableList());
.toList();

List<String> brokerScanTypes = scanRequestBO.getScanTypes();
List<RouteBundle> routes = brokerScanTypes.stream()
Expand All @@ -111,15 +111,14 @@ public String scan(ScanRequestBO scanRequestBO) {
.filter(Objects::nonNull)
.filter(list -> !list.isEmpty())
.toList().stream()
)
.toList().stream().flatMap(List::stream).toList();
).toList().stream().flatMap(List::stream).toList();

return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId);
}

public void handleError(Exception e, ScanCommandMessage message){
public void handleError(Exception e, ScanCommandMessage message) {

if( scanStatusPublisherOpt.isEmpty()){
if (scanStatusPublisherOpt.isEmpty()) {
return;
}
ScanStatusPublisher scanStatusPublisher = scanStatusPublisherOpt.get();
Expand All @@ -140,7 +139,7 @@ public void handleError(Exception e, ScanCommandMessage message){
"orgId", orgId,
"runtimeAgentId", runtimeAgentId
);
scanStatusPublisher.sendOverallScanStatus(response,topicVars);
scanStatusPublisher.sendOverallScanStatus(response, topicVars);
}

private MessagingServiceEntity retrieveMessagingServiceEntity(String messagingServiceId) {
Expand All @@ -158,4 +157,9 @@ public Page<ScanItemBO> findAll(Pageable pageable) {
public Page<ScanItemBO> findByMessagingServiceId(String messagingServiceId, Pageable pageable) {
return scanService.findByMessagingServiceId(messagingServiceId, pageable);
}


public boolean isScanComplete(String scanId) {
return scanService.isScanComplete(scanId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.MDC;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand All @@ -36,6 +37,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -449,4 +451,22 @@ protected RouteBundleHierarchyStore registerRouteRecipients(RouteBundle routeBun
}
return pathStore;
}

public boolean isScanComplete(String scanId) {
Set<String> completeScanStatuses = Set.of(
mynecker marked this conversation as resolved.
Show resolved Hide resolved
ScanStatus.COMPLETE.name(),
ScanStatus.FAILED.name(),
ScanStatus.TIMED_OUT.name()
);


List<ScanTypeEntity> allScanTypes = scanTypeRepository.findAllByScanId(scanId);
if (CollectionUtils.isEmpty(allScanTypes)) {
return false;
}
return allScanTypes.stream()
.map(scanStatusRepository::findByScanType)
.allMatch(status -> completeScanStatuses.contains(status.getStatus()));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants;
import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.MessageProcessor;
import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator;
import com.solace.messaging.MessagingService;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
Expand All @@ -14,6 +15,7 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.util.HashMap;
Expand All @@ -32,7 +34,9 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp
private final Map<Class, MessageProcessor> messageProcessorsByClassType;
private final MessagingService messagingService;
private final EventPortalProperties eventPortalProperties;
private final ThreadPoolTaskExecutor executor;
@Getter
@SuppressWarnings("PMD.MutableStaticState")
private PersistentMessageReceiver persistentMessageReceiver;

protected SolacePersistentMessageHandler(MessagingService messagingService,
Expand All @@ -44,22 +48,30 @@ protected SolacePersistentMessageHandler(MessagingService messagingService,
this.eventPortalProperties = eventPortalProperties;
messageProcessorsByClassType = messageProcessorList.stream()
.collect(Collectors.toMap(MessageProcessor::supportedClass, Function.identity()));
executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize());
executor.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize());
executor.setQueueCapacity(eventPortalProperties.getCommandThreadPoolQueueSize());
executor.setThreadNamePrefix("solace-persistent-message-handler-pool-");
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();

}

@Override
public void onMessage(InboundMessage inboundMessage) {
executor.submit(() -> processMessage(inboundMessage));
}


private void processMessage(InboundMessage inboundMessage) {
String mopMessageSubclass = "";
MessageProcessor processor = null;
Object message = null;
try {
mopMessageSubclass = inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER);
String messageAsString = inboundMessage.getPayloadAsString();
Class messageClass = cachedJSONDecoders.get(mopMessageSubclass);
if (messageClass == null) {
messageClass = Class.forName(mopMessageSubclass);
cachedJSONDecoders.put(mopMessageSubclass, messageClass);
}
Class messageClass = cachedJSONDecoders.computeIfAbsent(mopMessageSubclass, this::loadClass);
processor = messageProcessorsByClassType.get(messageClass);
if (processor == null) {
throw new UnsupportedOperationException("Could not find message processor for message of class " + messageClass.getCanonicalName());
Expand All @@ -68,21 +80,37 @@ public void onMessage(InboundMessage inboundMessage) {
log.trace("onMessage: {}\n{}", messageClass, messageAsString);
message = toMessage(messageAsString, messageClass);
processor.processMessage(processor.castToMessageClass(message));

} catch (Exception e) {
if (processor != null && message != null) {
log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass);
try {
processor.onFailure(e, processor.castToMessageClass(message));
} catch (Exception e1) {
log.error("error while handling message processing failure for mopMessageSubclass: {}", mopMessageSubclass, e);
}

} else {
log.error("Unsupported message and/or processor encountered. Skipping processing", e);
handleProcessingError(mopMessageSubclass, processor, message, e);
} finally {
acknowledgeMessage(inboundMessage);
}
}

private Class loadClass(String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
log.error("Failed to load class: {}", className, e);
throw new RuntimeException("Failed to load class: " + className, e);
}
}

private void handleProcessingError(String mopMessageSubclass, MessageProcessor processor, Object message, Exception e) {
if (processor != null && message != null) {
log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass, e);
try {
processor.onFailure(e, processor.castToMessageClass(message));
} catch (Exception e1) {
log.error("Error while handling message processing failure for mopMessageSubclass: {}", mopMessageSubclass, e1);
}
} else {
log.error("Unsupported message and/or processor encountered. Skipping processing", e);
}
}

} finally {
private void acknowledgeMessage(InboundMessage inboundMessage) {
synchronized (persistentMessageReceiver) {
persistentMessageReceiver.ack(inboundMessage);
}
}
Expand All @@ -103,6 +131,7 @@ private Queue determineQueue() {
return Queue.durableNonExclusiveQueue(eventPortalProperties.getIncomingRequestQueueName());
}


@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
Queue queue = determineQueue();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
package com.solace.maas.ep.event.management.agent.subscriber.messageProcessors;

import com.solace.maas.ep.common.messages.ScanCommandMessage;
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.scanManager.ScanManager;
import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.MDC;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_RECEIVED;
import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;

@Slf4j
@Component
Expand All @@ -27,11 +30,14 @@ public class ScanCommandMessageProcessor implements MessageProcessor<ScanCommand
private final ScanManager scanManager;
private final DynamicResourceConfigurationHelper dynamicResourceConfigurationHelper;
private final MeterRegistry meterRegistry;
private final EventPortalProperties eventPortalProperties;

public ScanCommandMessageProcessor(ScanManager scanManager,
EventPortalProperties eventPortalProperties,
DynamicResourceConfigurationHelper dynamicResourceConfigurationHelper,
MeterRegistry meterRegistry) {
this.scanManager = scanManager;
this.eventPortalProperties = eventPortalProperties;
this.dynamicResourceConfigurationHelper = dynamicResourceConfigurationHelper;
this.meterRegistry = meterRegistry;
}
Expand Down Expand Up @@ -78,6 +84,36 @@ public void processMessage(ScanCommandMessage message) {
log.info("Received scan request {}. Request details: {}", scanRequestBO.getScanId(), scanRequestBO);

scanManager.scan(scanRequestBO);
//if managed, wait for scan to complete
if (Boolean.TRUE.equals(eventPortalProperties.getManaged())) {
log.debug("Waiting for scan to complete for scanId: {}", scanId);
waitForScanCompletion(scanId);
}
}

public void waitForScanCompletion(String scanId) {
try {
await()
.atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec(), SECONDS)
.pollInterval(eventPortalProperties.getWaitAckScanCompletePollIntervalSec(), SECONDS)
.pollInSameThread()
.until(() -> {
try {
log.debug("Checking if scan with id {} is completed", scanId);
return waitUntilScanIsCompleted(scanId);
} catch (Exception e) {
log.error("Error while waiting for scan to complete", e);
return false;
}
});
} catch (ConditionTimeoutException e) {
// Handle the timeout scenario as needed
log.error("Scan with id {} did not complete within the expected time", scanId);
}
}

private boolean waitUntilScanIsCompleted(String scanId) {
return scanManager.isScanComplete(scanId);
}

@Override
Expand All @@ -92,6 +128,6 @@ public ScanCommandMessage castToMessageClass(Object message) {

@Override
public void onFailure(Exception e, ScanCommandMessage message) {
scanManager.handleError(e,message);
scanManager.handleError(e, message);
}
}
Loading
Loading