Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-84852: C-EMA restart resilience for scan jobs #208

Merged
merged 20 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions service/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<spring-kafka.version>3.2.4</spring-kafka.version>
<kafka-clients.version>3.5.0</kafka-clients.version>
<jackson.version>2.16.1</jackson.version>
<awaitility.version>4.2.0</awaitility.version>
<dockerfile-maven.version>1.4.13</dockerfile-maven.version>
</properties>
<repositories>
Expand Down Expand Up @@ -290,6 +291,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>

<!-- for testing -->
<dependency>
Expand All @@ -298,12 +303,7 @@
<version>2.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>com.solacesystems</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class EventPortalProperties {
private Boolean managed = false;
private String incomingRequestQueueName;

private int waitAckScanCompleteTimeoutSec = 300;
private int waitAckScanCompletePollIntervalSec = 10;

private GatewayProperties gateway
= new GatewayProperties("standalone", "standalone", new GatewayMessagingProperties(true, false, List.of()));
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.solace.maas.ep.event.management.agent.repository.scan;

import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanStatusEntity;
import com.solace.maas.ep.event.management.agent.repository.model.scan.ScanTypeEntity;
import org.springframework.data.repository.CrudRepository;

public interface ScanStatusRepository extends CrudRepository<ScanStatusEntity, String> {

ScanStatusEntity findByScanType(ScanTypeEntity scanType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;

import java.util.List;
import java.util.Optional;

@Repository
public interface ScanTypeRepository extends CrudRepository<ScanTypeEntity, String> {
Optional<ScanTypeEntity> findByNameAndScanId(String name, String scanId);
List<ScanTypeEntity> findAllByScanId(String scanId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.solace.maas.ep.event.management.agent.service.MessagingServiceDelegateServiceImpl;
import com.solace.maas.ep.event.management.agent.service.ScanService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
Expand Down Expand Up @@ -100,7 +101,7 @@ public String scan(ScanRequestBO scanRequestBO) {
).stream()
.findFirst()
.orElseThrow())
.collect(Collectors.toUnmodifiableList());
.toList();

List<String> brokerScanTypes = scanRequestBO.getScanTypes();
List<RouteBundle> routes = brokerScanTypes.stream()
Expand All @@ -111,15 +112,14 @@ public String scan(ScanRequestBO scanRequestBO) {
.filter(Objects::nonNull)
.filter(list -> !list.isEmpty())
.toList().stream()
)
.toList().stream().flatMap(List::stream).toList();
).toList().stream().flatMap(List::stream).toList();

return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId);
}

public void handleError(Exception e, ScanCommandMessage message){
public void handleError(Exception e, ScanCommandMessage message) {

if( scanStatusPublisherOpt.isEmpty()){
if (scanStatusPublisherOpt.isEmpty()) {
return;
}
ScanStatusPublisher scanStatusPublisher = scanStatusPublisherOpt.get();
Expand All @@ -140,7 +140,7 @@ public void handleError(Exception e, ScanCommandMessage message){
"orgId", orgId,
"runtimeAgentId", runtimeAgentId
);
scanStatusPublisher.sendOverallScanStatus(response,topicVars);
scanStatusPublisher.sendOverallScanStatus(response, topicVars);
}

private MessagingServiceEntity retrieveMessagingServiceEntity(String messagingServiceId) {
Expand All @@ -158,4 +158,12 @@ public Page<ScanItemBO> findAll(Pageable pageable) {
public Page<ScanItemBO> findByMessagingServiceId(String messagingServiceId, Pageable pageable) {
return scanService.findByMessagingServiceId(messagingServiceId, pageable);
}


public boolean isScanComplete(String scanId) {
if (ObjectUtils.isEmpty(scanId)) {
throw new IllegalArgumentException("Scan ID cannot be null or empty");
}
return scanService.isScanComplete(scanId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@
import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.MDC;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -449,4 +452,25 @@ protected RouteBundleHierarchyStore registerRouteRecipients(RouteBundle routeBun
}
return pathStore;
}

public boolean isScanComplete(String scanId) {
if (ObjectUtils.isEmpty(scanId)){
throw new IllegalArgumentException("Scan ID cannot be null or empty");
}
Set<String> completeScanStatuses = Set.of(
mynecker marked this conversation as resolved.
Show resolved Hide resolved
ScanStatus.COMPLETE.name(),
ScanStatus.FAILED.name(),
ScanStatus.TIMED_OUT.name()
);


List<ScanTypeEntity> allScanTypes = scanTypeRepository.findAllByScanId(scanId);
if (CollectionUtils.isEmpty(allScanTypes)) {
return false;
}
return allScanTypes.stream()
.map(scanStatusRepository::findByScanType)
.allMatch(status -> completeScanStatuses.contains(status.getStatus()));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants;
import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.MessageProcessor;
import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator;
import com.solace.messaging.MessagingService;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.receiver.PersistentMessageReceiver;
import com.solace.messaging.resources.Queue;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.util.HashMap;
Expand All @@ -32,9 +35,15 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp
private final Map<Class, MessageProcessor> messageProcessorsByClassType;
private final MessagingService messagingService;
private final EventPortalProperties eventPortalProperties;
private final ThreadPoolTaskExecutor executor;
@Getter
@SuppressWarnings("PMD.MutableStaticState")
private PersistentMessageReceiver persistentMessageReceiver;

// only used for testing
@Setter
private SolacePersistentMessageHandlerObserver messageHandlerObserver;

protected SolacePersistentMessageHandler(MessagingService messagingService,
EventPortalProperties eventPortalProperties,
List<MessageProcessor> messageProcessorList) {
Expand All @@ -44,22 +53,36 @@ protected SolacePersistentMessageHandler(MessagingService messagingService,
this.eventPortalProperties = eventPortalProperties;
messageProcessorsByClassType = messageProcessorList.stream()
.collect(Collectors.toMap(MessageProcessor::supportedClass, Function.identity()));

executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize());
executor.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize());
executor.setQueueCapacity(eventPortalProperties.getCommandThreadPoolQueueSize());
executor.setThreadNamePrefix("solace-persistent-message-handler-pool-");
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();
}


@Override
public void onMessage(InboundMessage inboundMessage) {
if (messageHandlerObserver != null) {
messageHandlerObserver.onMessageReceived(inboundMessage);
}
executor.submit(() -> processMessage(inboundMessage));
}


private void processMessage(InboundMessage inboundMessage) {
if (messageHandlerObserver != null) {
messageHandlerObserver.onMessageProcessingInitiated(inboundMessage);
}
String mopMessageSubclass = "";
MessageProcessor processor = null;
Object message = null;
try {
mopMessageSubclass = inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER);
String messageAsString = inboundMessage.getPayloadAsString();
Class messageClass = cachedJSONDecoders.get(mopMessageSubclass);
if (messageClass == null) {
messageClass = Class.forName(mopMessageSubclass);
cachedJSONDecoders.put(mopMessageSubclass, messageClass);
}
Class messageClass = cachedJSONDecoders.computeIfAbsent(mopMessageSubclass, this::loadClass);
processor = messageProcessorsByClassType.get(messageClass);
if (processor == null) {
throw new UnsupportedOperationException("Could not find message processor for message of class " + messageClass.getCanonicalName());
Expand All @@ -68,21 +91,46 @@ public void onMessage(InboundMessage inboundMessage) {
log.trace("onMessage: {}\n{}", messageClass, messageAsString);
message = toMessage(messageAsString, messageClass);
processor.processMessage(processor.castToMessageClass(message));

if (messageHandlerObserver != null) {
messageHandlerObserver.onMessageProcessingCompleted(inboundMessage);
}
} catch (Exception e) {
if (processor != null && message != null) {
log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass);
try {
processor.onFailure(e, processor.castToMessageClass(message));
} catch (Exception e1) {
log.error("error while handling message processing failure for mopMessageSubclass: {}", mopMessageSubclass, e);
}

} else {
log.error("Unsupported message and/or processor encountered. Skipping processing", e);
handleProcessingError(mopMessageSubclass, processor, message, e);
if (messageHandlerObserver != null) {
messageHandlerObserver.onMessageProcessingFailed(inboundMessage);
}

} finally {
acknowledgeMessage(inboundMessage);
if (messageHandlerObserver != null) {
messageHandlerObserver.onMessageProcessingAcknowledged(inboundMessage);
}
mynecker marked this conversation as resolved.
Show resolved Hide resolved
}
}

private Class loadClass(String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
log.error("Failed to load class: {}", className, e);
throw new RuntimeException("Failed to load class: " + className, e);
}
}

private void handleProcessingError(String mopMessageSubclass, MessageProcessor processor, Object message, Exception e) {
if (processor != null && message != null) {
log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass, e);
try {
processor.onFailure(e, processor.castToMessageClass(message));
} catch (Exception e1) {
log.error("Error while handling message processing failure for mopMessageSubclass: {}", mopMessageSubclass, e1);
}
} else {
log.error("Unsupported message and/or processor encountered. Skipping processing", e);
}
}

private void acknowledgeMessage(InboundMessage inboundMessage) {
synchronized (persistentMessageReceiver) {
persistentMessageReceiver.ack(inboundMessage);
}
}
Expand All @@ -103,6 +151,7 @@ private Queue determineQueue() {
return Queue.durableNonExclusiveQueue(eventPortalProperties.getIncomingRequestQueueName());
}


@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
Queue queue = determineQueue();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.solace.maas.ep.event.management.agent.subscriber;

import com.solace.messaging.receiver.InboundMessage;

/**
* The SolacePersistentMessageHandlerObserver interface defines methods to observe
* the lifecycle of messages handled by a Solace message handler. Implementers can
* use this interface to react to various stages of message processing.
* Primary use case is for testing purposes.
*/
public interface SolacePersistentMessageHandlerObserver {

void onMessageReceived(InboundMessage message);
void onMessageProcessingInitiated(InboundMessage message);
void onMessageProcessingCompleted(InboundMessage message);
void onMessageProcessingAcknowledged(InboundMessage message);
void onMessageProcessingFailed(InboundMessage message);

}
Loading
Loading