From 1ddbacc8cfca5861582644b1969d60549ed27f9d Mon Sep 17 00:00:00 2001 From: mynecker Date: Wed, 16 Oct 2024 15:55:53 +0200 Subject: [PATCH 01/20] added resilience for EMA restarts --- .../eventPortal/EventPortalProperties.java | 3 + .../agent/publisher/ScanStatusPublisher.java | 18 +- .../repository/scan/ScanStatusRepository.java | 3 + .../repository/scan/ScanTypeRepository.java | 2 + .../agent/scanManager/ScanManager.java | 15 +- .../management/agent/service/ScanService.java | 39 ++-- .../SolacePersistentMessageHandler.java | 64 ++++-- .../ScanCommandMessageProcessor.java | 54 +++-- .../agent/service/ScanServiceTests.java | 14 +- .../CommandPersistentMessageHandlerTests.java | 139 +++++++++++++ .../PersistentMessageHandlerTests.java | 85 ++++---- .../ScanJobPersistentMessageHandlerTests.java | 187 ++++++++++++++++++ 12 files changed, 497 insertions(+), 126 deletions(-) create mode 100644 service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java create mode 100644 service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java index dbf0e4841..c1b76ac72 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java @@ -24,6 +24,9 @@ public class EventPortalProperties { private Boolean managed = false; private String incomingRequestQueueName; + private int waitAckScanCompleteTimeout = 300; + private int waitAckScanCompletePollInterval = 10; + private GatewayProperties gateway = new GatewayProperties("standalone", "standalone", new GatewayMessagingProperties(true, false, List.of())); } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java index 077a63fc7..58d2cee0a 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java @@ -6,7 +6,7 @@ import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher; import com.solace.maas.ep.event.management.agent.plugin.route.exceptions.ScanOverallStatusException; import com.solace.maas.ep.event.management.agent.plugin.route.exceptions.ScanStatusException; -import io.micrometer.core.instrument.MeterRegistry; +import com.solace.maas.ep.event.management.agent.subscriber.SolacePersistentMessageHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -15,23 +15,15 @@ import java.util.List; import java.util.Map; -import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_SENT; -import static com.solace.maas.ep.common.metrics.ObservabilityConstants.ORG_ID_TAG; -import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG; -import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG; - @Slf4j @Component @ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false") public class ScanStatusPublisher { private final SolacePublisher solacePublisher; - private final MeterRegistry meterRegistry; - public ScanStatusPublisher(SolacePublisher solacePublisher, - MeterRegistry meterRegistry) { + public ScanStatusPublisher(SolacePublisher solacePublisher) { this.solacePublisher = solacePublisher; - this.meterRegistry = meterRegistry; } /** @@ -55,9 +47,6 @@ public void sendOverallScanStatus(ScanStatusMessage message, Map } catch (Exception e) { throw new ScanOverallStatusException("Over all status exception: " + e.getMessage(), Map.of(scanId, List.of(e)), "Overall status", Arrays.asList(scanType.split(",")), ScanStatus.valueOf(status)); - } finally { - meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, status, SCAN_ID_TAG, scanId, - ORG_ID_TAG, topicDetails.get("orgId")).increment(); } } @@ -84,9 +73,6 @@ public void sendScanDataStatus(ScanDataStatusMessage message, Map { + + ScanStatusEntity findByScanType(ScanTypeEntity scanType); } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanTypeRepository.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanTypeRepository.java index b41a16823..3f88f0d5e 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanTypeRepository.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/repository/scan/ScanTypeRepository.java @@ -4,9 +4,11 @@ import org.springframework.data.repository.CrudRepository; import org.springframework.stereotype.Repository; +import java.util.List; import java.util.Optional; @Repository public interface ScanTypeRepository extends CrudRepository { Optional findByNameAndScanId(String name, String scanId); + List findAllByScanId(String scanId); } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java index 2f3e2d1d8..428042592 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java @@ -110,16 +110,16 @@ public String scan(ScanRequestBO scanRequestBO) { brokerScanType, e.getKey())) .filter(Objects::nonNull) .filter(list -> !list.isEmpty()) - .toList().stream() + .collect(Collectors.toList()).stream() ) - .toList().stream().flatMap(List::stream).toList(); + .collect(Collectors.toList()).stream().flatMap(List::stream).collect(Collectors.toList()); return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId); } - public void handleError(Exception e, ScanCommandMessage message){ + public void handleError(Exception e, ScanCommandMessage message) { - if( scanStatusPublisherOpt.isEmpty()){ + if (scanStatusPublisherOpt.isEmpty()) { return; } ScanStatusPublisher scanStatusPublisher = scanStatusPublisherOpt.get(); @@ -140,7 +140,7 @@ public void handleError(Exception e, ScanCommandMessage message){ "orgId", orgId, "runtimeAgentId", runtimeAgentId ); - scanStatusPublisher.sendOverallScanStatus(response,topicVars); + scanStatusPublisher.sendOverallScanStatus(response, topicVars); } private MessagingServiceEntity retrieveMessagingServiceEntity(String messagingServiceId) { @@ -158,4 +158,9 @@ public Page findAll(Pageable pageable) { public Page findByMessagingServiceId(String messagingServiceId, Pageable pageable) { return scanService.findByMessagingServiceId(messagingServiceId, pageable); } + + + public boolean isScanComplete(String scanId) { + return scanService.isScanComplete(scanId); + } } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java index 772dd8943..2099e11d8 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java @@ -21,11 +21,11 @@ import com.solace.maas.ep.event.management.agent.scanManager.model.ScanItemBO; import com.solace.maas.ep.event.management.agent.scanManager.model.ScanTypeBO; import com.solace.maas.ep.event.management.agent.util.IDGenerator; -import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils; import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; +import org.apache.commons.collections4.CollectionUtils; import org.slf4j.MDC; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -36,15 +36,12 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_SENT; -import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG; -import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG; - /** * Responsible for initiating and managing Messaging Service scans. */ @@ -67,17 +64,11 @@ public class ScanService { private final IDGenerator idGenerator; - private final MeterRegistry meterRegistry; - public ScanService(ScanRepository repository, ScanRecipientHierarchyRepository scanRecipientHierarchyRepository, - ScanTypeRepository scanTypeRepository, - ScanStatusRepository scanStatusRepository, - ScanRouteService scanRouteService, - RouteService routeService, - ProducerTemplate producerTemplate, - IDGenerator idGenerator, - MeterRegistry meterRegistry) { + ScanTypeRepository scanTypeRepository, ScanStatusRepository scanStatusRepository, ScanRouteService scanRouteService, + RouteService routeService, ProducerTemplate producerTemplate, + IDGenerator idGenerator) { this.repository = repository; this.scanRecipientHierarchyRepository = scanRecipientHierarchyRepository; this.scanTypeRepository = scanTypeRepository; @@ -86,7 +77,6 @@ public ScanService(ScanRepository repository, this.routeService = routeService; this.producerTemplate = producerTemplate; this.idGenerator = idGenerator; - this.meterRegistry = meterRegistry; } /** @@ -296,7 +286,6 @@ public void sendScanStatus(String groupId, String scanId, String traceId, String exchange.getIn().setHeader(RouteConstants.SCAN_STATUS, status); exchange.getIn().setHeader(RouteConstants.SCAN_STATUS_DESC, ""); }); - meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, status.name(), SCAN_ID_TAG, scanId).increment(); } protected CompletableFuture scanAsync(String groupId, String scanId, String traceId, String actorId, @@ -449,4 +438,22 @@ protected RouteBundleHierarchyStore registerRouteRecipients(RouteBundle routeBun } return pathStore; } + + public boolean isScanComplete(String scanId) { + Set completeScanStatuses = Set.of( + ScanStatus.COMPLETE.name(), + ScanStatus.FAILED.name(), + ScanStatus.TIMED_OUT.name() + ); + + + List allScanTypes = scanTypeRepository.findAllByScanId(scanId); + if (CollectionUtils.isEmpty(allScanTypes)) { + return false; + } + return allScanTypes.stream() + .map(scanStatusRepository::findByScanType) + .allMatch(status -> completeScanStatuses.contains(status.getStatus())); + + } } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java index d575f6a47..15737f396 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java @@ -1,8 +1,10 @@ package com.solace.maas.ep.event.management.agent.subscriber; +import com.solace.maas.ep.common.messages.ScanCommandMessage; import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants; import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.MessageProcessor; +import com.solace.maas.ep.event.management.agent.util.MdcTaskDecorator; import com.solace.messaging.MessagingService; import com.solace.messaging.receiver.InboundMessage; import com.solace.messaging.receiver.MessageReceiver; @@ -14,6 +16,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import java.util.HashMap; @@ -32,8 +35,10 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp private final Map messageProcessorsByClassType; private final MessagingService messagingService; private final EventPortalProperties eventPortalProperties; + private final ThreadPoolTaskExecutor executor; @Getter private PersistentMessageReceiver persistentMessageReceiver; + public static Map seenScanIds = new HashMap<>(); protected SolacePersistentMessageHandler(MessagingService messagingService, EventPortalProperties eventPortalProperties, @@ -44,22 +49,30 @@ protected SolacePersistentMessageHandler(MessagingService messagingService, this.eventPortalProperties = eventPortalProperties; messageProcessorsByClassType = messageProcessorList.stream() .collect(Collectors.toMap(MessageProcessor::supportedClass, Function.identity())); + executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(eventPortalProperties.getCommandThreadPoolMinSize()); + executor.setMaxPoolSize(eventPortalProperties.getCommandThreadPoolMaxSize()); + executor.setQueueCapacity(eventPortalProperties.getCommandThreadPoolQueueSize()); + executor.setThreadNamePrefix("solace-persistent-message-handler-pool-"); + executor.setTaskDecorator(new MdcTaskDecorator()); + executor.initialize(); } @Override public void onMessage(InboundMessage inboundMessage) { + executor.submit(() -> processMessage(inboundMessage)); + } + + + private void processMessage(InboundMessage inboundMessage) { String mopMessageSubclass = ""; MessageProcessor processor = null; Object message = null; try { mopMessageSubclass = inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER); String messageAsString = inboundMessage.getPayloadAsString(); - Class messageClass = cachedJSONDecoders.get(mopMessageSubclass); - if (messageClass == null) { - messageClass = Class.forName(mopMessageSubclass); - cachedJSONDecoders.put(mopMessageSubclass, messageClass); - } + Class messageClass = cachedJSONDecoders.computeIfAbsent(mopMessageSubclass, this::loadClass); processor = messageProcessorsByClassType.get(messageClass); if (processor == null) { throw new UnsupportedOperationException("Could not find message processor for message of class " + messageClass.getCanonicalName()); @@ -68,21 +81,37 @@ public void onMessage(InboundMessage inboundMessage) { log.trace("onMessage: {}\n{}", messageClass, messageAsString); message = toMessage(messageAsString, messageClass); processor.processMessage(processor.castToMessageClass(message)); - } catch (Exception e) { - if (processor != null && message != null) { - log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass); - try { - processor.onFailure(e, processor.castToMessageClass(message)); - } catch (Exception e1) { - log.error("error while handling message processing failure for mopMessageSubclass: {}", mopMessageSubclass, e); - } - - } else { - log.error("Unsupported message and/or processor encountered. Skipping processing", e); + handleProcessingError(mopMessageSubclass, processor, message, e); + } finally { + acknowledgeMessage(inboundMessage); + } + } + + private Class loadClass(String className) { + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + log.error("Failed to load class: {}", className, e); + throw new RuntimeException("Failed to load class: " + className, e); + } + } + + private void handleProcessingError(String mopMessageSubclass, MessageProcessor processor, Object message, Exception e) { + if (processor != null && message != null) { + log.error("Error while processing inbound message from queue for mopMessageSubclass: {}", mopMessageSubclass, e); + try { + processor.onFailure(e, processor.castToMessageClass(message)); + } catch (Exception e1) { + log.error("Error while handling message processing failure for mopMessageSubclass: {}", mopMessageSubclass, e1); } + } else { + log.error("Unsupported message and/or processor encountered. Skipping processing", e); + } + } - } finally { + private void acknowledgeMessage(InboundMessage inboundMessage) { + synchronized (persistentMessageReceiver) { persistentMessageReceiver.ack(inboundMessage); } } @@ -103,6 +132,7 @@ private Queue determineQueue() { return Queue.durableNonExclusiveQueue(eventPortalProperties.getIncomingRequestQueueName()); } + @Override public void onApplicationEvent(ApplicationReadyEvent event) { Queue queue = determineQueue(); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java index 7bcd5995e..bb79daa52 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java @@ -1,12 +1,13 @@ package com.solace.maas.ep.event.management.agent.subscriber.messageProcessors; import com.solace.maas.ep.common.messages.ScanCommandMessage; +import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; import com.solace.maas.ep.event.management.agent.scanManager.ScanManager; import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO; -import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils; import org.apache.commons.collections4.CollectionUtils; +import org.awaitility.core.ConditionTimeoutException; import org.slf4j.MDC; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -15,8 +16,9 @@ import java.util.List; import java.util.UUID; -import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_RECEIVED; -import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; @Slf4j @Component @@ -26,21 +28,19 @@ public class ScanCommandMessageProcessor implements MessageProcessor destinations = new ArrayList<>(); List entityTypes = new ArrayList<>(); @@ -68,7 +68,7 @@ public void processMessage(ScanCommandMessage message) { ScanRequestBO scanRequestBO = ScanRequestBO.builder() .messagingServiceId(message.getMessagingServiceId()) - .scanId(scanId) + .scanId(!StringUtils.isEmpty(message.getScanId()) ? message.getScanId() : UUID.randomUUID().toString()) .traceId(message.getTraceId()) .actorId(message.getActorId()) .scanTypes(entityTypes) @@ -77,7 +77,37 @@ public void processMessage(ScanCommandMessage message) { log.info("Received scan request {}. Request details: {}", scanRequestBO.getScanId(), scanRequestBO); - scanManager.scan(scanRequestBO); + String scanId = scanManager.scan(scanRequestBO); + //if managed, wait for scan to complete + if (eventPortalProperties.getManaged()) { + log.debug("Waiting for scan to complete for scanId: {}", scanId); + waitForScanCompletion(scanId); + } + } + + public void waitForScanCompletion(String scanId) { + try { + await() + .atMost(eventPortalProperties.getWaitAckScanCompleteTimeout(), SECONDS) + .pollInterval(eventPortalProperties.getWaitAckScanCompletePollInterval(), SECONDS) + .pollInSameThread() + .until(() -> { + try { + log.debug("Checking if scan with id {} is completed", scanId); + return waitUntilScanIsCompleted(scanId); + } catch (Exception e) { + log.error("Error while waiting for scan to complete", e); + return false; + } + }); + } catch (ConditionTimeoutException e) { + // Handle the timeout scenario as needed + log.error("Scan with id {} did not complete within the expected time", scanId); + } + } + + private boolean waitUntilScanIsCompleted(String scanId) { + return scanManager.isScanComplete(scanId); } @Override @@ -92,6 +122,6 @@ public ScanCommandMessage castToMessageClass(Object message) { @Override public void onFailure(Exception e, ScanCommandMessage message) { - scanManager.handleError(e,message); + scanManager.handleError(e, message); } } diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java index e7a810b1a..df7924698 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java @@ -18,9 +18,6 @@ import com.solace.maas.ep.event.management.agent.repository.scan.ScanTypeRepository; import com.solace.maas.ep.event.management.agent.service.logging.LoggingService; import com.solace.maas.ep.event.management.agent.util.IDGenerator; -import io.micrometer.core.instrument.Meter; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.noop.NoopCounter; import lombok.SneakyThrows; import org.apache.camel.Processor; import org.apache.camel.Produce; @@ -98,9 +95,6 @@ public class ScanServiceTests { @Autowired private ScanServiceHelper scanServiceHelper; - @Mock - private MeterRegistry meterRegistry; - @Test @SneakyThrows public void testSingleScanWithRouteBundle() { @@ -149,10 +143,9 @@ public void testSingleScanWithRouteBundle() { .thenReturn(scanType); when(scanStatusRepository.save(scanStatus)) .thenReturn(scanStatus); + when(scanRecipientHierarchyRepository.save(any(ScanRecipientHierarchyEntity.class))) .thenReturn(mock(ScanRecipientHierarchyEntity.class)); - when(meterRegistry.counter(any(), any(String[].class))) - .thenReturn(new NoopCounter(new Meter.Id("noop", null, null, null, null))); scanService.singleScan(List.of(topicListing, consumerGroups, additionalConsumerGroupConfigBundle), "groupId", @@ -317,12 +310,9 @@ public void testParseRouteRecipients() { @Test @SneakyThrows public void testSendScanStatus() { - when(meterRegistry.counter(any(), any(String[].class))) - .thenReturn(new NoopCounter(new Meter.Id("noop", null, null, null, null))); ScanService service = new ScanService(mock(ScanRepository.class), mock(ScanRecipientHierarchyRepository.class), mock(ScanTypeRepository.class), - mock(ScanStatusRepository.class), mock(ScanRouteService.class), mock(RouteService.class), - template, idGenerator, meterRegistry); + mock(ScanStatusRepository.class), mock(ScanRouteService.class), mock(RouteService.class), template, idGenerator); service.sendScanStatus("scanId", "groupId", "messagingServiceId", "traceId", "actorId", "queueListing", ScanStatus.IN_PROGRESS); diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java new file mode 100644 index 000000000..3455427fe --- /dev/null +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java @@ -0,0 +1,139 @@ +package com.solace.maas.ep.event.management.agent.subscriber; + +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.solace.maas.ep.common.messages.CommandMessage; +import com.solace.maas.ep.common.messages.ScanCommandMessage; +import com.solace.maas.ep.common.messages.ScanDataImportMessage; +import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; +import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle; +import com.solace.maas.ep.event.management.agent.plugin.command.model.ExecutionType; +import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants; +import com.solace.maas.ep.event.management.agent.plugin.mop.MOPSvcType; +import com.solace.maas.ep.event.management.agent.scanManager.ScanManager; +import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.CommandMessageProcessor; +import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.ScanCommandMessageProcessor; +import com.solace.messaging.MessagingService; +import com.solace.messaging.receiver.InboundMessage; +import com.solace.messaging.receiver.PersistentMessageReceiver; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.context.ActiveProfiles; + +import java.util.List; + +import static com.solace.maas.ep.common.model.ScanDestination.EVENT_PORTAL; +import static com.solace.maas.ep.common.model.ScanType.SOLACE_ALL; +import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ActiveProfiles("TEST") +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { + "eventPortal.gateway.messaging.standalone=false", + "eventPortal.managed=true", + "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123" +}) +@Slf4j +public class CommandPersistentMessageHandlerTests { + + @MockBean + private ScanManager scanManager; + + @MockBean + private PersistentMessageReceiver persistentMessageReceiver; + + @Autowired + private MessagingService messagingService; + + @SpyBean + private ScanCommandMessageProcessor scanCommandMessageProcessor; + + @SpyBean + private CommandMessageProcessor commandMessageProcessor; + + @Autowired + private EventPortalProperties eventPortalProperties; + + @SpyBean + private SolacePersistentMessageHandler solacePersistentMessageHandler; + + @Autowired + private ObjectMapper objectMapper; + + private InboundMessage inboundMessage; + + private ListAppender listAppender; + + @BeforeEach + void setup() { + Logger scanLogger = (Logger) LoggerFactory.getLogger(SolacePersistentMessageHandler.class); + listAppender = new ListAppender<>(); + listAppender.start(); + scanLogger.addAppender(listAppender); + inboundMessage = mock(InboundMessage.class); + } + + @Test + void testCommandMessageHandler() { + CommandMessage message = new CommandMessage(); + message.setOrigType(MOPSvcType.maasEventMgmt); + message.withMessageType(generic); + message.setContext("abc"); + message.setServiceId("someSvcId"); + message.setActorId("myActorId"); + message.setOrgId(eventPortalProperties.getOrganizationId()); + message.setTraceId("myTraceId"); + message.setCommandCorrelationId("myCorrelationId"); + message.setCommandBundles(List.of( + CommandBundle.builder() + .executionType(ExecutionType.serial) + .exitOnFailure(true) + .commands(List.of()) + .build())); + when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(message)); + when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( + CommandMessage.class.getCanonicalName() + ); + + solacePersistentMessageHandler.onMessage(inboundMessage); + + verify(commandMessageProcessor, times(1)).castToMessageClass(any()); + verify(commandMessageProcessor, times(1)).processMessage(any()); + + // There must be no interaction with scanCommandMessageProcessor + verify(scanCommandMessageProcessor, times(0)).castToMessageClass(any()); + verify(scanCommandMessageProcessor, times(0)).processMessage(any()); + + + verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); + } + + + + + private String jsonString(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException(e); + } + + } + +} diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java index a3e3d32a4..e9e5fd0b2 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java @@ -18,6 +18,7 @@ import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.ScanCommandMessageProcessor; import com.solace.messaging.MessagingService; import com.solace.messaging.receiver.InboundMessage; +import com.solace.messaging.receiver.PersistentMessageReceiver; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -35,12 +36,16 @@ import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +/** + * Tests for {@link SolacePersistentMessageHandler} correct dispatch of messages to appropriate processors + */ @ActiveProfiles("TEST") @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { "eventPortal.gateway.messaging.standalone=false", @@ -53,6 +58,9 @@ public class PersistentMessageHandlerTests { @MockBean private ScanManager scanManager; + @MockBean + private PersistentMessageReceiver persistentMessageReceiver; + @Autowired private MessagingService messagingService; @@ -84,29 +92,6 @@ void setup() { inboundMessage = mock(InboundMessage.class); } - @Test - void testScanCommandMessageHandler() { - ScanCommandMessage scanCommandMessage = - new ScanCommandMessage("messagingServiceId", - "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); - when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); - when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( - ScanCommandMessage.class.getCanonicalName() - ); - - solacePersistentMessageHandler.onMessage(inboundMessage); - - verify(scanCommandMessageProcessor, times(1)).castToMessageClass(any()); - verify(scanCommandMessageProcessor, times(1)).processMessage(any()); - - // There must be no interaction with commandMessageProcessor - verify(commandMessageProcessor, times(0)).castToMessageClass(any()); - verify(commandMessageProcessor, times(0)).processMessage(any()); - - - verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); - } - @Test void testCommandMessageHandler() { CommandMessage message = new CommandMessage(); @@ -130,6 +115,8 @@ void testCommandMessageHandler() { ); solacePersistentMessageHandler.onMessage(inboundMessage); + // Wait for the executor to process the message + waitForExecutorToProcessMessage(); verify(commandMessageProcessor, times(1)).castToMessageClass(any()); verify(commandMessageProcessor, times(1)).processMessage(any()); @@ -137,9 +124,27 @@ void testCommandMessageHandler() { // There must be no interaction with scanCommandMessageProcessor verify(scanCommandMessageProcessor, times(0)).castToMessageClass(any()); verify(scanCommandMessageProcessor, times(0)).processMessage(any()); + } + @Test + void testScanCommandMessageHandler() { + ScanCommandMessage scanCommandMessage = + new ScanCommandMessage("messagingServiceId", + "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); + when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); + when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( + ScanCommandMessage.class.getCanonicalName() + ); - verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); + solacePersistentMessageHandler.onMessage(inboundMessage); + waitForExecutorToProcessMessage(); + + verify(commandMessageProcessor, times(0)).castToMessageClass(any()); + verify(commandMessageProcessor, times(0)).processMessage(any()); + + // There must be no interaction with scanCommandMessageProcessor + verify(scanCommandMessageProcessor, times(1)).castToMessageClass(any()); + verify(scanCommandMessageProcessor, times(1)).processMessage(any()); } @Test @@ -151,6 +156,7 @@ void testUnsupportedMessageHandling() { ScanDataImportMessage.class.getCanonicalName() ); solacePersistentMessageHandler.onMessage(inboundMessage); + waitForExecutorToProcessMessage(); List logs = listAppender.list; assertThat(logs.get(logs.size() - 1).getFormattedMessage()).isEqualTo("Unsupported message and/or processor encountered. Skipping processing"); verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); @@ -159,33 +165,16 @@ void testUnsupportedMessageHandling() { verify(commandMessageProcessor, times(0)).onFailure(any(), any()); } - - @Test - void testMessageAcknowledgementWhenProcessingError() { - ScanCommandMessage scanCommandMessage = - new ScanCommandMessage("messagingServiceId", - "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); - when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); - - doThrow(new IllegalArgumentException("Test processing error msg")).when(scanCommandMessageProcessor).processMessage(scanCommandMessage); - when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( - ScanCommandMessage.class.getCanonicalName() - ); - - solacePersistentMessageHandler.onMessage(inboundMessage); - List logs = listAppender.list; - assertThat(logs.get(logs.size() - 1).getFormattedMessage()) - .isEqualTo("Error while processing inbound message from queue for mopMessageSubclass: " + ScanCommandMessage.class.getCanonicalName()); - verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); - - // scan command message processor MUST handle the exception - verify(scanCommandMessageProcessor, times(1)).onFailure(any(), any()); - - //commandMessageProcessor MUST do nothing (not a config push command) - verify(commandMessageProcessor, times(0)).onFailure(any(), any()); + private void waitForExecutorToProcessMessage() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } + private String jsonString(Object object) { try { return objectMapper.writeValueAsString(object); diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java new file mode 100644 index 000000000..fae01265b --- /dev/null +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java @@ -0,0 +1,187 @@ +package com.solace.maas.ep.event.management.agent.subscriber; + +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.solace.maas.ep.common.messages.ScanCommandMessage; +import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; +import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants; +import com.solace.maas.ep.event.management.agent.scanManager.ScanManager; +import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.CommandMessageProcessor; +import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.ScanCommandMessageProcessor; +import com.solace.messaging.MessagingService; +import com.solace.messaging.receiver.InboundMessage; +import com.solace.messaging.receiver.PersistentMessageReceiver; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.context.ActiveProfiles; + +import java.util.List; + +import static com.solace.maas.ep.common.model.ScanDestination.EVENT_PORTAL; +import static com.solace.maas.ep.common.model.ScanType.SOLACE_ALL; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ActiveProfiles("TEST") +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { + "eventPortal.gateway.messaging.standalone=false", + "eventPortal.managed=true", + "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123", + "eventPortal.waitAckScanCompletePollInterval=1", + "eventPortal.waitAckScanCompleteTimeout=10", + + +}) +@Slf4j +public class ScanJobPersistentMessageHandlerTests { + + @MockBean + private ScanManager scanManager; + + @MockBean + private PersistentMessageReceiver persistentMessageReceiver; + + @Autowired + private MessagingService messagingService; + + @SpyBean + private ScanCommandMessageProcessor scanCommandMessageProcessor; + + @Autowired + private EventPortalProperties eventPortalProperties; + + @SpyBean + private SolacePersistentMessageHandler solacePersistentMessageHandler; + + @Autowired + private ObjectMapper objectMapper; + + private InboundMessage inboundMessage; + + private ListAppender listAppenderPersistentMessageHandler; + private ListAppender listAppendersCanCommandMessageProcessor; + + @BeforeEach + void setup() { + Logger loggerPersistentMessageHandler = (Logger) LoggerFactory.getLogger(SolacePersistentMessageHandler.class); + listAppenderPersistentMessageHandler = new ListAppender<>(); + listAppenderPersistentMessageHandler.start(); + loggerPersistentMessageHandler.addAppender(listAppenderPersistentMessageHandler); + + Logger loggerScanCommandMessageProcessor = (Logger) LoggerFactory.getLogger(ScanCommandMessageProcessor.class); + listAppendersCanCommandMessageProcessor = new ListAppender<>(); + listAppendersCanCommandMessageProcessor.start(); + loggerScanCommandMessageProcessor.addAppender(listAppendersCanCommandMessageProcessor); + + inboundMessage = mock(InboundMessage.class); + } + + @Test + void testPersistentMessageHandlerScanCommandMsgAcked() { + ScanCommandMessage scanCommandMessage = + new ScanCommandMessage("messagingServiceId", + "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); + when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); + when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( + ScanCommandMessage.class.getCanonicalName() + ); + when(scanManager.scan(any())).thenReturn("scanId"); + when(scanManager.isScanComplete("scanId")).thenReturn(true); + solacePersistentMessageHandler.onMessage(inboundMessage); + // sleep for a while to allow the scan complete poll interval to pass + waitForScanCompletePolling(2); + // the scan command message processor should be called once by the persistent message handler + verify(scanCommandMessageProcessor, times(1)).processMessage(any()); + // if the scan is managed, the waitForScanCompletion method should be called + verify(scanCommandMessageProcessor, atLeastOnce()).waitForScanCompletion(any()); + // the message should be acked after the scan is complete + verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); + } + + @Test + void testPersistentMessageHandlerScanCommandTimeoutMsgAcked() { + ScanCommandMessage scanCommandMessage = + new ScanCommandMessage("messagingServiceId", + "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); + when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); + when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( + ScanCommandMessage.class.getCanonicalName() + ); + when(scanManager.scan(any())).thenReturn("scanId"); + // the scan is not complete and will never be ;-) + // the waitForScanCompletion method will throw an exception after the timeout + when(scanManager.isScanComplete("scanId")).thenReturn(false); + solacePersistentMessageHandler.onMessage(inboundMessage); + // sleep for a while to allow the scan complete poll interval to pass + waitForScanCompletePolling(5); + + + // the scan command message processor should be called once by the persistent message handler + verify(scanCommandMessageProcessor, times(1)).processMessage(any()); + // if the scan is managed, the waitForScanCompletion method should be called + verify(scanCommandMessageProcessor, atLeastOnce()).waitForScanCompletion(any()); + // the message should be acked after the scan is complete, even though it is timed out + verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); + List logs = listAppendersCanCommandMessageProcessor.list; + assertThat(logs.get(logs.size() - 1).getFormattedMessage()) + .isEqualTo("Scan with id scanId did not complete within the expected time"); + } + + @Test + void testPersistentMessageHandlerScanCommandExceptionThrownMsgAcked() { + ScanCommandMessage scanCommandMessage = + new ScanCommandMessage("messagingServiceId", + "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); + when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); + when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( + ScanCommandMessage.class.getCanonicalName() + ); + when(scanManager.scan(any())).thenThrow(new RuntimeException("Test exception thrown on purpose")); + solacePersistentMessageHandler.onMessage(inboundMessage); + // sleep for a while to allow the scan complete poll interval to pass + waitForScanCompletePolling(2); + List logs = listAppenderPersistentMessageHandler.list; + assertThat(logs.get(logs.size() - 1).getFormattedMessage()) + .isEqualTo("Error while processing inbound message from queue for mopMessageSubclass: " + ScanCommandMessage.class.getCanonicalName()); + + // the scan command message processor should be called once by the persistent message handler + verify(scanCommandMessageProcessor, times(1)).processMessage(any()); + verify(scanCommandMessageProcessor, atLeastOnce()).onFailure(any(), any()); + // the message should be acked after the scan is complete + verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); + } + + private void waitForScanCompletePolling(Integer additionalSeconds) { + try { + int timeout =eventPortalProperties.getWaitAckScanCompleteTimeout() + additionalSeconds; + Thread.sleep(timeout* 1000L ); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private String jsonString(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException(e); + } + + } + +} From 5e9658ec771f3ff16e2f37992fd28ec70e94e6b7 Mon Sep 17 00:00:00 2001 From: mynecker Date: Wed, 16 Oct 2024 17:22:24 +0200 Subject: [PATCH 02/20] added meter support --- .../agent/publisher/ScanStatusPublisher.java | 17 ++++++++++++++++- .../management/agent/service/ScanService.java | 19 ++++++++++++++++--- .../ScanCommandMessageProcessor.java | 15 +++++++++++---- .../agent/service/ScanServiceTests.java | 14 ++++++++++++-- 4 files changed, 55 insertions(+), 10 deletions(-) diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java index 58d2cee0a..be38e8741 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java @@ -6,6 +6,7 @@ import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher; import com.solace.maas.ep.event.management.agent.plugin.route.exceptions.ScanOverallStatusException; import com.solace.maas.ep.event.management.agent.plugin.route.exceptions.ScanStatusException; +import io.micrometer.core.instrument.MeterRegistry; import com.solace.maas.ep.event.management.agent.subscriber.SolacePersistentMessageHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -15,15 +16,23 @@ import java.util.List; import java.util.Map; +import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_SENT; +import static com.solace.maas.ep.common.metrics.ObservabilityConstants.ORG_ID_TAG; +import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG; +import static com.solace.maas.ep.common.metrics.ObservabilityConstants.STATUS_TAG; + @Slf4j @Component @ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false") public class ScanStatusPublisher { private final SolacePublisher solacePublisher; + private final MeterRegistry meterRegistry; - public ScanStatusPublisher(SolacePublisher solacePublisher) { + public ScanStatusPublisher(SolacePublisher solacePublisher, + MeterRegistry meterRegistry) { this.solacePublisher = solacePublisher; + this.meterRegistry = meterRegistry; } /** @@ -47,6 +56,9 @@ public void sendOverallScanStatus(ScanStatusMessage message, Map } catch (Exception e) { throw new ScanOverallStatusException("Over all status exception: " + e.getMessage(), Map.of(scanId, List.of(e)), "Overall status", Arrays.asList(scanType.split(",")), ScanStatus.valueOf(status)); + } finally { + meterRegistry.counter(MAAS_EMA_SCAN_EVENT_SENT, STATUS_TAG, status, SCAN_ID_TAG, scanId, + ORG_ID_TAG, topicDetails.get("orgId")).increment(); } } @@ -73,6 +85,9 @@ public void sendScanDataStatus(ScanDataStatusMessage message, Map scanAsync(String groupId, String scanId, String traceId, String actorId, diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java index bb79daa52..a84da6d8e 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java @@ -4,6 +4,7 @@ import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; import com.solace.maas.ep.event.management.agent.scanManager.ScanManager; import com.solace.maas.ep.event.management.agent.scanManager.model.ScanRequestBO; +import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import net.logstash.logback.encoder.org.apache.commons.lang3.StringUtils; import org.apache.commons.collections4.CollectionUtils; @@ -15,7 +16,8 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; - +import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_RECEIVED; +import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; @@ -28,19 +30,24 @@ public class ScanCommandMessageProcessor implements MessageProcessor destinations = new ArrayList<>(); List entityTypes = new ArrayList<>(); @@ -68,7 +75,7 @@ public void processMessage(ScanCommandMessage message) { ScanRequestBO scanRequestBO = ScanRequestBO.builder() .messagingServiceId(message.getMessagingServiceId()) - .scanId(!StringUtils.isEmpty(message.getScanId()) ? message.getScanId() : UUID.randomUUID().toString()) + .scanId(scanId) .traceId(message.getTraceId()) .actorId(message.getActorId()) .scanTypes(entityTypes) @@ -77,7 +84,7 @@ public void processMessage(ScanCommandMessage message) { log.info("Received scan request {}. Request details: {}", scanRequestBO.getScanId(), scanRequestBO); - String scanId = scanManager.scan(scanRequestBO); + scanManager.scan(scanRequestBO); //if managed, wait for scan to complete if (eventPortalProperties.getManaged()) { log.debug("Waiting for scan to complete for scanId: {}", scanId); diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java index df7924698..e7a810b1a 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java @@ -18,6 +18,9 @@ import com.solace.maas.ep.event.management.agent.repository.scan.ScanTypeRepository; import com.solace.maas.ep.event.management.agent.service.logging.LoggingService; import com.solace.maas.ep.event.management.agent.util.IDGenerator; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.noop.NoopCounter; import lombok.SneakyThrows; import org.apache.camel.Processor; import org.apache.camel.Produce; @@ -95,6 +98,9 @@ public class ScanServiceTests { @Autowired private ScanServiceHelper scanServiceHelper; + @Mock + private MeterRegistry meterRegistry; + @Test @SneakyThrows public void testSingleScanWithRouteBundle() { @@ -143,9 +149,10 @@ public void testSingleScanWithRouteBundle() { .thenReturn(scanType); when(scanStatusRepository.save(scanStatus)) .thenReturn(scanStatus); - when(scanRecipientHierarchyRepository.save(any(ScanRecipientHierarchyEntity.class))) .thenReturn(mock(ScanRecipientHierarchyEntity.class)); + when(meterRegistry.counter(any(), any(String[].class))) + .thenReturn(new NoopCounter(new Meter.Id("noop", null, null, null, null))); scanService.singleScan(List.of(topicListing, consumerGroups, additionalConsumerGroupConfigBundle), "groupId", @@ -310,9 +317,12 @@ public void testParseRouteRecipients() { @Test @SneakyThrows public void testSendScanStatus() { + when(meterRegistry.counter(any(), any(String[].class))) + .thenReturn(new NoopCounter(new Meter.Id("noop", null, null, null, null))); ScanService service = new ScanService(mock(ScanRepository.class), mock(ScanRecipientHierarchyRepository.class), mock(ScanTypeRepository.class), - mock(ScanStatusRepository.class), mock(ScanRouteService.class), mock(RouteService.class), template, idGenerator); + mock(ScanStatusRepository.class), mock(ScanRouteService.class), mock(RouteService.class), + template, idGenerator, meterRegistry); service.sendScanStatus("scanId", "groupId", "messagingServiceId", "traceId", "actorId", "queueListing", ScanStatus.IN_PROGRESS); From 7e5233840ebbdc31b857b7017f4f4ea176203ab4 Mon Sep 17 00:00:00 2001 From: mynecker Date: Wed, 16 Oct 2024 17:25:38 +0200 Subject: [PATCH 03/20] code cleanup --- .../management/agent/publisher/ScanStatusPublisher.java | 1 - .../agent/subscriber/SolacePersistentMessageHandler.java | 1 - .../messageProcessors/ScanCommandMessageProcessor.java | 1 - .../subscriber/CommandPersistentMessageHandlerTests.java | 7 ------- .../agent/subscriber/PersistentMessageHandlerTests.java | 2 -- .../subscriber/ScanJobPersistentMessageHandlerTests.java | 2 -- 6 files changed, 14 deletions(-) diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java index be38e8741..077a63fc7 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/publisher/ScanStatusPublisher.java @@ -7,7 +7,6 @@ import com.solace.maas.ep.event.management.agent.plugin.route.exceptions.ScanOverallStatusException; import com.solace.maas.ep.event.management.agent.plugin.route.exceptions.ScanStatusException; import io.micrometer.core.instrument.MeterRegistry; -import com.solace.maas.ep.event.management.agent.subscriber.SolacePersistentMessageHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java index 15737f396..8bd2f4f20 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java @@ -1,6 +1,5 @@ package com.solace.maas.ep.event.management.agent.subscriber; -import com.solace.maas.ep.common.messages.ScanCommandMessage; import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants; import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.MessageProcessor; diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java index a84da6d8e..3e1379001 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java @@ -18,7 +18,6 @@ import java.util.UUID; import static com.solace.maas.ep.common.metrics.ObservabilityConstants.MAAS_EMA_SCAN_EVENT_RECEIVED; import static com.solace.maas.ep.common.metrics.ObservabilityConstants.SCAN_ID_TAG; -import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.awaitility.Awaitility.await; diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java index 3455427fe..c86cad664 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java @@ -6,8 +6,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.solace.maas.ep.common.messages.CommandMessage; -import com.solace.maas.ep.common.messages.ScanCommandMessage; -import com.solace.maas.ep.common.messages.ScanDataImportMessage; import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle; import com.solace.maas.ep.event.management.agent.plugin.command.model.ExecutionType; @@ -31,13 +29,8 @@ import java.util.List; -import static com.solace.maas.ep.common.model.ScanDestination.EVENT_PORTAL; -import static com.solace.maas.ep.common.model.ScanType.SOLACE_ALL; import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java index e9e5fd0b2..bfc4009a3 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java @@ -36,8 +36,6 @@ import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java index fae01265b..5428eab2c 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java @@ -9,7 +9,6 @@ import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants; import com.solace.maas.ep.event.management.agent.scanManager.ScanManager; -import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.CommandMessageProcessor; import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.ScanCommandMessageProcessor; import com.solace.messaging.MessagingService; import com.solace.messaging.receiver.InboundMessage; @@ -31,7 +30,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; From ee04c063d629729117356d089540356eecc6e30b Mon Sep 17 00:00:00 2001 From: mynecker Date: Wed, 16 Oct 2024 18:04:20 +0200 Subject: [PATCH 04/20] code cleanup --- .../SolacePersistentMessageHandler.java | 2 +- .../CommandPersistentMessageHandlerTests.java | 132 ------------------ .../PersistentMessageHandlerTests.java | 2 + 3 files changed, 3 insertions(+), 133 deletions(-) delete mode 100644 service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java index 8bd2f4f20..416edf484 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java @@ -36,8 +36,8 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp private final EventPortalProperties eventPortalProperties; private final ThreadPoolTaskExecutor executor; @Getter + @SuppressWarnings("PMD.MutableStaticState") private PersistentMessageReceiver persistentMessageReceiver; - public static Map seenScanIds = new HashMap<>(); protected SolacePersistentMessageHandler(MessagingService messagingService, EventPortalProperties eventPortalProperties, diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java deleted file mode 100644 index c86cad664..000000000 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/CommandPersistentMessageHandlerTests.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.solace.maas.ep.event.management.agent.subscriber; - -import ch.qos.logback.classic.Logger; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.read.ListAppender; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.solace.maas.ep.common.messages.CommandMessage; -import com.solace.maas.ep.event.management.agent.config.eventPortal.EventPortalProperties; -import com.solace.maas.ep.event.management.agent.plugin.command.model.CommandBundle; -import com.solace.maas.ep.event.management.agent.plugin.command.model.ExecutionType; -import com.solace.maas.ep.event.management.agent.plugin.mop.MOPConstants; -import com.solace.maas.ep.event.management.agent.plugin.mop.MOPSvcType; -import com.solace.maas.ep.event.management.agent.scanManager.ScanManager; -import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.CommandMessageProcessor; -import com.solace.maas.ep.event.management.agent.subscriber.messageProcessors.ScanCommandMessageProcessor; -import com.solace.messaging.MessagingService; -import com.solace.messaging.receiver.InboundMessage; -import com.solace.messaging.receiver.PersistentMessageReceiver; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.boot.test.mock.mockito.SpyBean; -import org.springframework.test.context.ActiveProfiles; - -import java.util.List; - -import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ActiveProfiles("TEST") -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { - "eventPortal.gateway.messaging.standalone=false", - "eventPortal.managed=true", - "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123" -}) -@Slf4j -public class CommandPersistentMessageHandlerTests { - - @MockBean - private ScanManager scanManager; - - @MockBean - private PersistentMessageReceiver persistentMessageReceiver; - - @Autowired - private MessagingService messagingService; - - @SpyBean - private ScanCommandMessageProcessor scanCommandMessageProcessor; - - @SpyBean - private CommandMessageProcessor commandMessageProcessor; - - @Autowired - private EventPortalProperties eventPortalProperties; - - @SpyBean - private SolacePersistentMessageHandler solacePersistentMessageHandler; - - @Autowired - private ObjectMapper objectMapper; - - private InboundMessage inboundMessage; - - private ListAppender listAppender; - - @BeforeEach - void setup() { - Logger scanLogger = (Logger) LoggerFactory.getLogger(SolacePersistentMessageHandler.class); - listAppender = new ListAppender<>(); - listAppender.start(); - scanLogger.addAppender(listAppender); - inboundMessage = mock(InboundMessage.class); - } - - @Test - void testCommandMessageHandler() { - CommandMessage message = new CommandMessage(); - message.setOrigType(MOPSvcType.maasEventMgmt); - message.withMessageType(generic); - message.setContext("abc"); - message.setServiceId("someSvcId"); - message.setActorId("myActorId"); - message.setOrgId(eventPortalProperties.getOrganizationId()); - message.setTraceId("myTraceId"); - message.setCommandCorrelationId("myCorrelationId"); - message.setCommandBundles(List.of( - CommandBundle.builder() - .executionType(ExecutionType.serial) - .exitOnFailure(true) - .commands(List.of()) - .build())); - when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(message)); - when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( - CommandMessage.class.getCanonicalName() - ); - - solacePersistentMessageHandler.onMessage(inboundMessage); - - verify(commandMessageProcessor, times(1)).castToMessageClass(any()); - verify(commandMessageProcessor, times(1)).processMessage(any()); - - // There must be no interaction with scanCommandMessageProcessor - verify(scanCommandMessageProcessor, times(0)).castToMessageClass(any()); - verify(scanCommandMessageProcessor, times(0)).processMessage(any()); - - - verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); - } - - - - - private String jsonString(Object object) { - try { - return objectMapper.writeValueAsString(object); - } catch (JsonProcessingException e) { - throw new IllegalArgumentException(e); - } - - } - -} diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java index bfc4009a3..89b8f3b60 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java @@ -143,6 +143,8 @@ void testScanCommandMessageHandler() { // There must be no interaction with scanCommandMessageProcessor verify(scanCommandMessageProcessor, times(1)).castToMessageClass(any()); verify(scanCommandMessageProcessor, times(1)).processMessage(any()); + + verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); } @Test From 1f920b2a359c3a74990d616f541654c1888c6749 Mon Sep 17 00:00:00 2001 From: mynecker Date: Wed, 16 Oct 2024 18:11:14 +0200 Subject: [PATCH 05/20] fixed POM --- service/application/pom.xml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/service/application/pom.xml b/service/application/pom.xml index 04d2909a0..8675cb702 100644 --- a/service/application/pom.xml +++ b/service/application/pom.xml @@ -290,6 +290,11 @@ org.springframework.boot spring-boot-starter-actuator + + org.awaitility + awaitility + 4.2.0 + @@ -298,12 +303,7 @@ 2.2 test - - org.awaitility - awaitility - 4.2.0 - test - + com.solacesystems From 84b4feec48a9d4bfcee9b88ac5d78a0ca482befd Mon Sep 17 00:00:00 2001 From: mynecker Date: Wed, 16 Oct 2024 18:32:53 +0200 Subject: [PATCH 06/20] fixed test --- .../agent/subscriber/PersistentMessageHandlerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java index 89b8f3b60..537aa4c2e 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java @@ -144,7 +144,7 @@ void testScanCommandMessageHandler() { verify(scanCommandMessageProcessor, times(1)).castToMessageClass(any()); verify(scanCommandMessageProcessor, times(1)).processMessage(any()); - verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); + } @Test From 3a78578123e29c5f5b03383acdff0faee422b392 Mon Sep 17 00:00:00 2001 From: mynecker Date: Thu, 17 Oct 2024 14:02:34 +0200 Subject: [PATCH 07/20] added ScanServiceTests --- .../agent/service/ScanServiceTests.java | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java index e7a810b1a..8820a09f8 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java @@ -27,6 +27,8 @@ import org.apache.camel.ProducerTemplate; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; @@ -339,4 +341,71 @@ public void testFindById() { assertThatNoException(); } + + @ParameterizedTest + @ValueSource(strings = {"COMPLETE", "FAILED", "TIMED_OUT"}) + @SneakyThrows + public void testIsScanCompleteIsComplete(String scanStatus) { + ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", scanStatus); + ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA); + + when(scanTypeRepository.findAllByScanId(any(String.class))) + .thenReturn(List.of(scanTypeA)); + when(scanStatusRepository.findByScanType(any(ScanTypeEntity.class))) + .thenReturn(scanStatusA); + + Assertions.assertTrue(scanService.isScanComplete("scan1")); + } + + @ParameterizedTest + @ValueSource(strings = {"IN_PROGRESS", "INITIATED"}) + @SneakyThrows + public void testIsScanCompleteIsNotComplete(String scanStatus) { + ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", scanStatus); + ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA); + + when(scanTypeRepository.findAllByScanId(any(String.class))) + .thenReturn(List.of(scanTypeA)); + when(scanStatusRepository.findByScanType(any(ScanTypeEntity.class))) + .thenReturn(scanStatusA); + + Assertions.assertFalse(scanService.isScanComplete("scan1")); + } + + @Test + @SneakyThrows + public void testIsScanCompleteIsCompleteScans() { + ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", "COMPLETE"); + ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA); + + ScanStatusEntity scanStatusB = scanServiceHelper.buildScanStatusEntity("status2", "COMPLETE"); + ScanTypeEntity scanTypeB = scanServiceHelper.buildScanTypeEntity("124", "queueListing", null, scanStatusB); + + when(scanTypeRepository.findAllByScanId(any(String.class))) + .thenReturn(List.of(scanTypeA,scanTypeB)); + when(scanStatusRepository.findByScanType(scanTypeA)) + .thenReturn(scanStatusA); + when(scanStatusRepository.findByScanType(scanTypeB)) + .thenReturn(scanStatusB); + + Assertions.assertTrue(scanService.isScanComplete("scan1")); + } + + @Test + @SneakyThrows + public void testIsScanCompleteIsNotCompleteScans() { + ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", "COMPLETE"); + ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA); + + ScanStatusEntity scanStatusB = scanServiceHelper.buildScanStatusEntity("status2", "IN_PROGRESS"); + ScanTypeEntity scanTypeB = scanServiceHelper.buildScanTypeEntity("124", "queueConfiguration", null, scanStatusB); + + when(scanTypeRepository.findAllByScanId(any(String.class))) + .thenReturn(List.of(scanTypeA,scanTypeB)); + when(scanStatusRepository.findByScanType(scanTypeA)) + .thenReturn(scanStatusA); + when(scanStatusRepository.findByScanType(scanTypeB)) + .thenReturn(scanStatusB); + Assertions.assertFalse(scanService.isScanComplete("scan1")); + } } From cb4716a907bf4e03064e15c49b9001496ae7653b Mon Sep 17 00:00:00 2001 From: mynecker Date: Thu, 17 Oct 2024 14:56:22 +0200 Subject: [PATCH 08/20] refactorings --- .../ep/event/management/agent/scanManager/ScanManager.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java index 428042592..a250d8f54 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java @@ -100,7 +100,7 @@ public String scan(ScanRequestBO scanRequestBO) { ).stream() .findFirst() .orElseThrow()) - .collect(Collectors.toUnmodifiableList()); + .toList(); List brokerScanTypes = scanRequestBO.getScanTypes(); List routes = brokerScanTypes.stream() @@ -110,9 +110,8 @@ public String scan(ScanRequestBO scanRequestBO) { brokerScanType, e.getKey())) .filter(Objects::nonNull) .filter(list -> !list.isEmpty()) - .collect(Collectors.toList()).stream() - ) - .collect(Collectors.toList()).stream().flatMap(List::stream).collect(Collectors.toList()); + .toList().stream() + ).toList().stream().flatMap(List::stream).toList(); return scanService.singleScan(routes, groupId, scanId, traceId, actorId, messagingServiceEntity, runtimeAgentId); } From 8b8a77e21676bf412106af281e93555298fd4167 Mon Sep 17 00:00:00 2001 From: mynecker Date: Thu, 17 Oct 2024 15:16:40 +0200 Subject: [PATCH 09/20] added unit test --- .../management/agent/service/ScanServiceTests.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java index 8820a09f8..2cd71e3a4 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java @@ -372,6 +372,19 @@ public void testIsScanCompleteIsNotComplete(String scanStatus) { Assertions.assertFalse(scanService.isScanComplete("scan1")); } + @Test + @SneakyThrows + public void testIsScanCompleteIsNotCompleteEmptyScanTypeList() { + ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", "COMPLETE"); + + when(scanTypeRepository.findAllByScanId(any(String.class))) + .thenReturn(List.of()); + when(scanStatusRepository.findByScanType(any(ScanTypeEntity.class))) + .thenReturn(scanStatusA); + + Assertions.assertFalse(scanService.isScanComplete("scan1")); + } + @Test @SneakyThrows public void testIsScanCompleteIsCompleteScans() { From 9f1a5b3360988824511d6043322fd98527fd0f53 Mon Sep 17 00:00:00 2001 From: mynecker Date: Mon, 21 Oct 2024 11:29:41 +0200 Subject: [PATCH 10/20] fixed review recommendations --- service/application/pom.xml | 2 +- .../eventPortal/EventPortalProperties.java | 4 ++-- .../ScanCommandMessageProcessor.java | 6 ++--- .../agent/service/ScanServiceTests.java | 23 +++++-------------- .../ScanJobPersistentMessageHandlerTests.java | 8 +++---- 5 files changed, 16 insertions(+), 27 deletions(-) diff --git a/service/application/pom.xml b/service/application/pom.xml index 8675cb702..874a3f3ec 100644 --- a/service/application/pom.xml +++ b/service/application/pom.xml @@ -19,6 +19,7 @@ 3.2.4 3.5.0 2.16.1 + 4.2.0 1.4.13 @@ -293,7 +294,6 @@ org.awaitility awaitility - 4.2.0 diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java index c1b76ac72..ea7eca24e 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/config/eventPortal/EventPortalProperties.java @@ -24,8 +24,8 @@ public class EventPortalProperties { private Boolean managed = false; private String incomingRequestQueueName; - private int waitAckScanCompleteTimeout = 300; - private int waitAckScanCompletePollInterval = 10; + private int waitAckScanCompleteTimeoutSec = 300; + private int waitAckScanCompletePollIntervalSec = 10; private GatewayProperties gateway = new GatewayProperties("standalone", "standalone", new GatewayMessagingProperties(true, false, List.of())); diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java index 3e1379001..6d7f80307 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/messageProcessors/ScanCommandMessageProcessor.java @@ -85,7 +85,7 @@ public void processMessage(ScanCommandMessage message) { scanManager.scan(scanRequestBO); //if managed, wait for scan to complete - if (eventPortalProperties.getManaged()) { + if (Boolean.TRUE.equals(eventPortalProperties.getManaged())) { log.debug("Waiting for scan to complete for scanId: {}", scanId); waitForScanCompletion(scanId); } @@ -94,8 +94,8 @@ public void processMessage(ScanCommandMessage message) { public void waitForScanCompletion(String scanId) { try { await() - .atMost(eventPortalProperties.getWaitAckScanCompleteTimeout(), SECONDS) - .pollInterval(eventPortalProperties.getWaitAckScanCompletePollInterval(), SECONDS) + .atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec(), SECONDS) + .pollInterval(eventPortalProperties.getWaitAckScanCompletePollIntervalSec(), SECONDS) .pollInSameThread() .until(() -> { try { diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java index 2cd71e3a4..1316c28cd 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java @@ -345,7 +345,7 @@ public void testFindById() { @ParameterizedTest @ValueSource(strings = {"COMPLETE", "FAILED", "TIMED_OUT"}) @SneakyThrows - public void testIsScanCompleteIsComplete(String scanStatus) { + public void testScanStatusCompletionWithSingleScan(String scanStatus) { ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", scanStatus); ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA); @@ -360,7 +360,7 @@ public void testIsScanCompleteIsComplete(String scanStatus) { @ParameterizedTest @ValueSource(strings = {"IN_PROGRESS", "INITIATED"}) @SneakyThrows - public void testIsScanCompleteIsNotComplete(String scanStatus) { + public void testScanStatusIncompleteWithSingleScan(String scanStatus) { ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", scanStatus); ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA); @@ -372,22 +372,11 @@ public void testIsScanCompleteIsNotComplete(String scanStatus) { Assertions.assertFalse(scanService.isScanComplete("scan1")); } - @Test - @SneakyThrows - public void testIsScanCompleteIsNotCompleteEmptyScanTypeList() { - ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", "COMPLETE"); - when(scanTypeRepository.findAllByScanId(any(String.class))) - .thenReturn(List.of()); - when(scanStatusRepository.findByScanType(any(ScanTypeEntity.class))) - .thenReturn(scanStatusA); - Assertions.assertFalse(scanService.isScanComplete("scan1")); - } - - @Test - @SneakyThrows - public void testIsScanCompleteIsCompleteScans() { + @Test + @SneakyThrows + public void testScanStatusCompletedWithMultipleScans(){ ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", "COMPLETE"); ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA); @@ -406,7 +395,7 @@ public void testIsScanCompleteIsCompleteScans() { @Test @SneakyThrows - public void testIsScanCompleteIsNotCompleteScans() { + public void testScanStatusNotCompleteWithMultipleScans() { ScanStatusEntity scanStatusA = scanServiceHelper.buildScanStatusEntity("status1", "COMPLETE"); ScanTypeEntity scanTypeA = scanServiceHelper.buildScanTypeEntity("123", "queueListing", null, scanStatusA); diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java index 5428eab2c..1bfcf6994 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java @@ -46,7 +46,7 @@ }) @Slf4j -public class ScanJobPersistentMessageHandlerTests { +class ScanJobPersistentMessageHandlerTests { @MockBean private ScanManager scanManager; @@ -105,7 +105,7 @@ void testPersistentMessageHandlerScanCommandMsgAcked() { waitForScanCompletePolling(2); // the scan command message processor should be called once by the persistent message handler verify(scanCommandMessageProcessor, times(1)).processMessage(any()); - // if the scan is managed, the waitForScanCompletion method should be called + // if the EMA is managed, the waitForScanCompletion method should be called verify(scanCommandMessageProcessor, atLeastOnce()).waitForScanCompletion(any()); // the message should be acked after the scan is complete verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); @@ -131,7 +131,7 @@ void testPersistentMessageHandlerScanCommandTimeoutMsgAcked() { // the scan command message processor should be called once by the persistent message handler verify(scanCommandMessageProcessor, times(1)).processMessage(any()); - // if the scan is managed, the waitForScanCompletion method should be called + // if the EMA is managed, the waitForScanCompletion method should be called verify(scanCommandMessageProcessor, atLeastOnce()).waitForScanCompletion(any()); // the message should be acked after the scan is complete, even though it is timed out verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); @@ -166,7 +166,7 @@ void testPersistentMessageHandlerScanCommandExceptionThrownMsgAcked() { private void waitForScanCompletePolling(Integer additionalSeconds) { try { - int timeout =eventPortalProperties.getWaitAckScanCompleteTimeout() + additionalSeconds; + int timeout =eventPortalProperties.getWaitAckScanCompleteTimeoutSec() + additionalSeconds; Thread.sleep(timeout* 1000L ); } catch (InterruptedException e) { throw new RuntimeException(e); From 0576ee7fbf78776bb9c3b9607704e9a392a4b2df Mon Sep 17 00:00:00 2001 From: mynecker Date: Mon, 21 Oct 2024 11:35:19 +0200 Subject: [PATCH 11/20] fixed review recommendations --- .../agent/subscriber/PersistentMessageHandlerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java index 537aa4c2e..920adfc1a 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java @@ -140,7 +140,7 @@ void testScanCommandMessageHandler() { verify(commandMessageProcessor, times(0)).castToMessageClass(any()); verify(commandMessageProcessor, times(0)).processMessage(any()); - // There must be no interaction with scanCommandMessageProcessor + // There must be an interaction with scanCommandMessageProcessor verify(scanCommandMessageProcessor, times(1)).castToMessageClass(any()); verify(scanCommandMessageProcessor, times(1)).processMessage(any()); From e13465c0f736bd4b7ee89ff8f91f5b2014d4c846 Mon Sep 17 00:00:00 2001 From: mynecker Date: Mon, 21 Oct 2024 15:30:41 +0200 Subject: [PATCH 12/20] fixed review recommendations --- .../agent/subscriber/ScanJobPersistentMessageHandlerTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java index 1bfcf6994..3fe993deb 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java @@ -101,6 +101,7 @@ void testPersistentMessageHandlerScanCommandMsgAcked() { when(scanManager.scan(any())).thenReturn("scanId"); when(scanManager.isScanComplete("scanId")).thenReturn(true); solacePersistentMessageHandler.onMessage(inboundMessage); + // sleep for a while to allow the scan complete poll interval to pass waitForScanCompletePolling(2); // the scan command message processor should be called once by the persistent message handler From 22a7967c373d95fcdd9d9c0d966cd973d4d25e80 Mon Sep 17 00:00:00 2001 From: mynecker Date: Mon, 21 Oct 2024 15:42:51 +0200 Subject: [PATCH 13/20] fixed timeout issue for IT test --- .../subscriber/ScanJobPersistentMessageHandlerTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java index 3fe993deb..cbc53edf5 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java @@ -40,8 +40,8 @@ "eventPortal.gateway.messaging.standalone=false", "eventPortal.managed=true", "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123", - "eventPortal.waitAckScanCompletePollInterval=1", - "eventPortal.waitAckScanCompleteTimeout=10", + "eventPortal.waitAckScanCompletePollIntervalSec=1", + "eventPortal.waitAckScanCompleteTimeoutSec=10", }) From 3e41dfd5d27d8a5a8d0fddbd934f0985e44516d8 Mon Sep 17 00:00:00 2001 From: mynecker Date: Tue, 22 Oct 2024 10:53:44 +0200 Subject: [PATCH 14/20] added observer for SolacePersistentMessageHandler to simplify unit testing --- .../SolacePersistentMessageHandler.java | 22 +++++- ...olacePersistentMessageHandlerObserver.java | 19 +++++ .../PersistentMessageHandlerTests.java | 36 +++++----- .../ScanJobPersistentMessageHandlerTests.java | 42 +++++++---- ...olacePersistentMessageHandlerObserver.java | 70 +++++++++++++++++++ 5 files changed, 160 insertions(+), 29 deletions(-) create mode 100644 service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java create mode 100644 service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java index 416edf484..e641d45d7 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java @@ -10,6 +10,7 @@ import com.solace.messaging.receiver.PersistentMessageReceiver; import com.solace.messaging.resources.Queue; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; @@ -39,6 +40,10 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp @SuppressWarnings("PMD.MutableStaticState") private PersistentMessageReceiver persistentMessageReceiver; + // This is a mutable field, but it is only used for testing purposes + @Setter + private SolacePersistentMessageHandlerObserver messageHandlerObserver; + protected SolacePersistentMessageHandler(MessagingService messagingService, EventPortalProperties eventPortalProperties, List messageProcessorList) { @@ -55,16 +60,22 @@ protected SolacePersistentMessageHandler(MessagingService messagingService, executor.setThreadNamePrefix("solace-persistent-message-handler-pool-"); executor.setTaskDecorator(new MdcTaskDecorator()); executor.initialize(); - } + @Override public void onMessage(InboundMessage inboundMessage) { + if (messageHandlerObserver != null) { + messageHandlerObserver.onMessageReceived(inboundMessage); + } executor.submit(() -> processMessage(inboundMessage)); } private void processMessage(InboundMessage inboundMessage) { + if (messageHandlerObserver != null) { + messageHandlerObserver.onMessageProcessingInitiated(inboundMessage); + } String mopMessageSubclass = ""; MessageProcessor processor = null; Object message = null; @@ -80,10 +91,19 @@ private void processMessage(InboundMessage inboundMessage) { log.trace("onMessage: {}\n{}", messageClass, messageAsString); message = toMessage(messageAsString, messageClass); processor.processMessage(processor.castToMessageClass(message)); + if (messageHandlerObserver != null) { + messageHandlerObserver.onMessageProcessingCompleted(inboundMessage); + } } catch (Exception e) { handleProcessingError(mopMessageSubclass, processor, message, e); + if (messageHandlerObserver != null) { + messageHandlerObserver.onMessageProcessingFailed(inboundMessage); + } } finally { acknowledgeMessage(inboundMessage); + if (messageHandlerObserver != null) { + messageHandlerObserver.onMessageProcessingAcknowledged(inboundMessage); + } } } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java new file mode 100644 index 000000000..499bb3e9d --- /dev/null +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java @@ -0,0 +1,19 @@ +package com.solace.maas.ep.event.management.agent.subscriber; + +import com.solace.messaging.receiver.InboundMessage; + +/** + * The SolacePersistentMessageHandlerObserver interface defines methods to observe + * the lifecycle of messages handled by a Solace message handler. Implementers can + * use this interface to react to various stages of message processing. + * Primary use case is for testing purposes. + */ +public interface SolacePersistentMessageHandlerObserver { + + void onMessageReceived(InboundMessage message); + void onMessageProcessingInitiated(InboundMessage message); + void onMessageProcessingCompleted(InboundMessage message); + void onMessageProcessingAcknowledged(InboundMessage message); + void onMessageProcessingFailed(InboundMessage message); + +} diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java index 920adfc1a..0d39e0887 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerTests.java @@ -34,7 +34,9 @@ import static com.solace.maas.ep.common.model.ScanDestination.EVENT_PORTAL; import static com.solace.maas.ep.common.model.ScanType.SOLACE_ALL; import static com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessageType.generic; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -48,7 +50,12 @@ @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { "eventPortal.gateway.messaging.standalone=false", "eventPortal.managed=true", - "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123" + "eventPortal.incomingRequestQueueName = ep_core_ema_requests_123456_123123", + "eventPortal.waitAckScanCompletePollIntervalSec=1", + "eventPortal.waitAckScanCompleteTimeoutSec=10", + "eventPortal.commandThreadPoolMinSize=5", + "eventPortal.commandThreadPoolMaxSize=10", + "eventPortal.commandThreadPoolQueueSize=20" }) @Slf4j public class PersistentMessageHandlerTests { @@ -81,6 +88,8 @@ public class PersistentMessageHandlerTests { private ListAppender listAppender; + private TestingSupportSolacePersistentMessageHandlerObserver messageHandlerObserver; + @BeforeEach void setup() { Logger scanLogger = (Logger) LoggerFactory.getLogger(SolacePersistentMessageHandler.class); @@ -88,6 +97,8 @@ void setup() { listAppender.start(); scanLogger.addAppender(listAppender); inboundMessage = mock(InboundMessage.class); + messageHandlerObserver = new TestingSupportSolacePersistentMessageHandlerObserver(); + solacePersistentMessageHandler.setMessageHandlerObserver(messageHandlerObserver); } @Test @@ -114,7 +125,7 @@ void testCommandMessageHandler() { solacePersistentMessageHandler.onMessage(inboundMessage); // Wait for the executor to process the message - waitForExecutorToProcessMessage(); + await().atMost(5, SECONDS).until(() -> messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)); verify(commandMessageProcessor, times(1)).castToMessageClass(any()); verify(commandMessageProcessor, times(1)).processMessage(any()); @@ -135,7 +146,7 @@ void testScanCommandMessageHandler() { ); solacePersistentMessageHandler.onMessage(inboundMessage); - waitForExecutorToProcessMessage(); + await().atMost(5, SECONDS).until(() -> messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)); verify(commandMessageProcessor, times(0)).castToMessageClass(any()); verify(commandMessageProcessor, times(0)).processMessage(any()); @@ -143,8 +154,6 @@ void testScanCommandMessageHandler() { // There must be an interaction with scanCommandMessageProcessor verify(scanCommandMessageProcessor, times(1)).castToMessageClass(any()); verify(scanCommandMessageProcessor, times(1)).processMessage(any()); - - } @Test @@ -156,7 +165,12 @@ void testUnsupportedMessageHandling() { ScanDataImportMessage.class.getCanonicalName() ); solacePersistentMessageHandler.onMessage(inboundMessage); - waitForExecutorToProcessMessage(); + await().atMost(5, SECONDS).until(() -> messageHandlerObserver.hasFailedMessage(inboundMessage)); + assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue(); + assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue(); + assertThat(messageHandlerObserver.hasCompletedMessageProcessing(inboundMessage)).isFalse(); + assertThat(messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)).isTrue(); + List logs = listAppender.list; assertThat(logs.get(logs.size() - 1).getFormattedMessage()).isEqualTo("Unsupported message and/or processor encountered. Skipping processing"); verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); @@ -165,16 +179,6 @@ void testUnsupportedMessageHandling() { verify(commandMessageProcessor, times(0)).onFailure(any(), any()); } - private void waitForExecutorToProcessMessage() { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - - private String jsonString(Object object) { try { return objectMapper.writeValueAsString(object); diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java index cbc53edf5..72b5f5de0 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java @@ -27,7 +27,9 @@ import static com.solace.maas.ep.common.model.ScanDestination.EVENT_PORTAL; import static com.solace.maas.ep.common.model.ScanType.SOLACE_ALL; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; @@ -74,6 +76,8 @@ class ScanJobPersistentMessageHandlerTests { private ListAppender listAppenderPersistentMessageHandler; private ListAppender listAppendersCanCommandMessageProcessor; + private TestingSupportSolacePersistentMessageHandlerObserver messageHandlerObserver; + @BeforeEach void setup() { Logger loggerPersistentMessageHandler = (Logger) LoggerFactory.getLogger(SolacePersistentMessageHandler.class); @@ -87,6 +91,8 @@ void setup() { loggerScanCommandMessageProcessor.addAppender(listAppendersCanCommandMessageProcessor); inboundMessage = mock(InboundMessage.class); + messageHandlerObserver = new TestingSupportSolacePersistentMessageHandlerObserver(); + solacePersistentMessageHandler.setMessageHandlerObserver(messageHandlerObserver); } @Test @@ -102,8 +108,14 @@ void testPersistentMessageHandlerScanCommandMsgAcked() { when(scanManager.isScanComplete("scanId")).thenReturn(true); solacePersistentMessageHandler.onMessage(inboundMessage); - // sleep for a while to allow the scan complete poll interval to pass - waitForScanCompletePolling(2); + //happy path - the message should be processed and acked + await().atMost(5, SECONDS).until(() -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); + + assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue(); + assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue(); + assertThat(messageHandlerObserver.hasCompletedMessageProcessing(inboundMessage)).isTrue(); + assertThat(messageHandlerObserver.hasFailedMessage(inboundMessage)).isFalse(); + // the scan command message processor should be called once by the persistent message handler verify(scanCommandMessageProcessor, times(1)).processMessage(any()); // if the EMA is managed, the waitForScanCompletion method should be called @@ -127,8 +139,14 @@ void testPersistentMessageHandlerScanCommandTimeoutMsgAcked() { when(scanManager.isScanComplete("scanId")).thenReturn(false); solacePersistentMessageHandler.onMessage(inboundMessage); // sleep for a while to allow the scan complete poll interval to pass - waitForScanCompletePolling(5); + await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec()+5, SECONDS).until(() -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); + assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue(); + assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue(); + // timeout should be logged but the message should be still acked + // timeout error handling is ultimately the responsibility of Event Portal + assertThat(messageHandlerObserver.hasFailedMessage(inboundMessage)).isFalse(); + assertThat(messageHandlerObserver.hasCompletedMessageProcessing(inboundMessage)).isTrue(); // the scan command message processor should be called once by the persistent message handler verify(scanCommandMessageProcessor, times(1)).processMessage(any()); @@ -153,7 +171,14 @@ void testPersistentMessageHandlerScanCommandExceptionThrownMsgAcked() { when(scanManager.scan(any())).thenThrow(new RuntimeException("Test exception thrown on purpose")); solacePersistentMessageHandler.onMessage(inboundMessage); // sleep for a while to allow the scan complete poll interval to pass - waitForScanCompletePolling(2); + await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec()+2, SECONDS).until(() -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); + assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue(); + assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue(); + // timeout should be logged but the message should be still acked + // timeout error handling is ultimately the responsibility of Event Portal + assertThat(messageHandlerObserver.hasFailedMessage(inboundMessage)).isTrue(); + assertThat(messageHandlerObserver.hasCompletedMessageProcessing(inboundMessage)).isFalse(); + List logs = listAppenderPersistentMessageHandler.list; assertThat(logs.get(logs.size() - 1).getFormattedMessage()) .isEqualTo("Error while processing inbound message from queue for mopMessageSubclass: " + ScanCommandMessage.class.getCanonicalName()); @@ -165,14 +190,7 @@ void testPersistentMessageHandlerScanCommandExceptionThrownMsgAcked() { verify(solacePersistentMessageHandler.getPersistentMessageReceiver(), times(1)).ack(inboundMessage); } - private void waitForScanCompletePolling(Integer additionalSeconds) { - try { - int timeout =eventPortalProperties.getWaitAckScanCompleteTimeoutSec() + additionalSeconds; - Thread.sleep(timeout* 1000L ); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + private String jsonString(Object object) { try { diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java new file mode 100644 index 000000000..26eaa7d69 --- /dev/null +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java @@ -0,0 +1,70 @@ +package com.solace.maas.ep.event.management.agent.subscriber; + +import com.solace.messaging.receiver.InboundMessage; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class TestingSupportSolacePersistentMessageHandlerObserver implements SolacePersistentMessageHandlerObserver { + + + private final Set receivedMessages = Collections.synchronizedSet(new HashSet<>()); + private final Set initiatedMessages = Collections.synchronizedSet(new HashSet<>()); + private final Set completedMessages = Collections.synchronizedSet(new HashSet<>()); + private final Set acknowledgedMessages = Collections.synchronizedSet(new HashSet<>()); + private final Set failedMessages =Collections.synchronizedSet(new HashSet<>()); + + @Override + public void onMessageReceived(InboundMessage message) { + receivedMessages.add(message); + } + + @Override + public void onMessageProcessingInitiated(InboundMessage message) { + initiatedMessages.add(message); + } + + @Override + public void onMessageProcessingCompleted(InboundMessage message) { + completedMessages.add(message); + } + + @Override + public void onMessageProcessingAcknowledged(InboundMessage message) { + acknowledgedMessages.add(message); + } + + @Override + public void onMessageProcessingFailed(InboundMessage message) { + failedMessages.add(message); + } + + public boolean hasReceivedMessage(InboundMessage message) { + return receivedMessages.contains(message); + } + + public boolean hasInitiatedMessageProcessing(InboundMessage message) { + return initiatedMessages.contains(message); + } + + public boolean hasCompletedMessageProcessing(InboundMessage message) { + return completedMessages.contains(message); + } + + public boolean hasAcknowledgedMessage(InboundMessage message) { + return acknowledgedMessages.contains(message); + } + + public boolean hasFailedMessage(InboundMessage message) { + return failedMessages.contains(message); + } + + public void clear() { + receivedMessages.clear(); + initiatedMessages.clear(); + completedMessages.clear(); + acknowledgedMessages.clear(); + failedMessages.clear(); + } +} From ec46bb68de43a2d4e0a3f7100440a410c8200361 Mon Sep 17 00:00:00 2001 From: mynecker Date: Tue, 22 Oct 2024 14:40:38 +0200 Subject: [PATCH 15/20] added observer for SolacePersistentMessageHandler to simplify unit testing --- .../ep/event/management/agent/scanManager/ScanManager.java | 4 ++++ .../maas/ep/event/management/agent/service/ScanService.java | 4 ++++ .../agent/subscriber/SolacePersistentMessageHandler.java | 2 +- .../event/management/agent/scanManager/ScanManagerTest.java | 6 ++++++ .../ep/event/management/agent/service/ScanServiceTests.java | 6 ++++++ 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java index a250d8f54..92bab79dc 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManager.java @@ -16,6 +16,7 @@ import com.solace.maas.ep.event.management.agent.service.MessagingServiceDelegateServiceImpl; import com.solace.maas.ep.event.management.agent.service.ScanService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; @@ -160,6 +161,9 @@ public Page findByMessagingServiceId(String messagingServiceId, Page public boolean isScanComplete(String scanId) { + if (ObjectUtils.isEmpty(scanId)) { + throw new IllegalArgumentException("Scan ID cannot be null or empty"); + } return scanService.isScanComplete(scanId); } } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java index e0a0593af..c6d1e9919 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/service/ScanService.java @@ -32,6 +32,7 @@ import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.ObjectUtils; import java.util.ArrayList; import java.util.List; @@ -453,6 +454,9 @@ protected RouteBundleHierarchyStore registerRouteRecipients(RouteBundle routeBun } public boolean isScanComplete(String scanId) { + if (ObjectUtils.isEmpty(scanId)){ + throw new IllegalArgumentException("Scan ID cannot be null or empty"); + } Set completeScanStatuses = Set.of( ScanStatus.COMPLETE.name(), ScanStatus.FAILED.name(), diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java index e641d45d7..3acd56ae5 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java @@ -40,7 +40,7 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp @SuppressWarnings("PMD.MutableStaticState") private PersistentMessageReceiver persistentMessageReceiver; - // This is a mutable field, but it is only used for testing purposes + // only used for testing @Setter private SolacePersistentMessageHandlerObserver messageHandlerObserver; diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java index ed5e46e66..cb9803608 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/scanManager/ScanManagerTest.java @@ -239,5 +239,11 @@ private List getKafkaRoutes(List destinations, String ); } + @Test + void testScanStatusIsScanCompleteInvalidArgument() { + Assertions.assertThrows(IllegalArgumentException.class, () -> scanManager.isScanComplete(null)); + Assertions.assertThrows(IllegalArgumentException.class, () -> scanManager.isScanComplete("")); + } + } diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java index 1316c28cd..fe84eba03 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/service/ScanServiceTests.java @@ -410,4 +410,10 @@ public void testScanStatusNotCompleteWithMultipleScans() { .thenReturn(scanStatusB); Assertions.assertFalse(scanService.isScanComplete("scan1")); } + + @Test + void testScanServiceIsScanCompleteInvalidArgument() { + Assertions.assertThrows(IllegalArgumentException.class, () -> scanService.isScanComplete(null)); + Assertions.assertThrows(IllegalArgumentException.class, () -> scanService.isScanComplete("")); + } } From 752acae85506db25c9b479ebce45ef30f39ac285 Mon Sep 17 00:00:00 2001 From: mynecker Date: Tue, 22 Oct 2024 14:42:17 +0200 Subject: [PATCH 16/20] added observer for SolacePersistentMessageHandler to simplify unit testing --- ...stingSupportSolacePersistentMessageHandlerObserver.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java index 26eaa7d69..2e42b0a2f 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java @@ -60,11 +60,4 @@ public boolean hasFailedMessage(InboundMessage message) { return failedMessages.contains(message); } - public void clear() { - receivedMessages.clear(); - initiatedMessages.clear(); - completedMessages.clear(); - acknowledgedMessages.clear(); - failedMessages.clear(); - } } From c6e9865bede2f4510a7ad75f4815ee063cc3c2ac Mon Sep 17 00:00:00 2001 From: mynecker Date: Tue, 22 Oct 2024 14:46:04 +0200 Subject: [PATCH 17/20] added observer for SolacePersistentMessageHandler to simplify unit testing --- .../subscriber/ScanJobPersistentMessageHandlerTests.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java index 72b5f5de0..a369d89c0 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java @@ -140,7 +140,8 @@ void testPersistentMessageHandlerScanCommandTimeoutMsgAcked() { solacePersistentMessageHandler.onMessage(inboundMessage); // sleep for a while to allow the scan complete poll interval to pass - await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec()+5, SECONDS).until(() -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); + await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec()+5, SECONDS).until(() + -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue(); assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue(); // timeout should be logged but the message should be still acked @@ -171,7 +172,8 @@ void testPersistentMessageHandlerScanCommandExceptionThrownMsgAcked() { when(scanManager.scan(any())).thenThrow(new RuntimeException("Test exception thrown on purpose")); solacePersistentMessageHandler.onMessage(inboundMessage); // sleep for a while to allow the scan complete poll interval to pass - await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec()+2, SECONDS).until(() -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); + await().atMost(eventPortalProperties.getWaitAckScanCompleteTimeoutSec()+2, SECONDS).until(() + -> messageHandlerObserver.hasAcknowledgedMessage(inboundMessage)); assertThat(messageHandlerObserver.hasReceivedMessage(inboundMessage)).isTrue(); assertThat(messageHandlerObserver.hasInitiatedMessageProcessing(inboundMessage)).isTrue(); // timeout should be logged but the message should be still acked @@ -181,7 +183,8 @@ void testPersistentMessageHandlerScanCommandExceptionThrownMsgAcked() { List logs = listAppenderPersistentMessageHandler.list; assertThat(logs.get(logs.size() - 1).getFormattedMessage()) - .isEqualTo("Error while processing inbound message from queue for mopMessageSubclass: " + ScanCommandMessage.class.getCanonicalName()); + .isEqualTo("Error while processing inbound message from queue for mopMessageSubclass: " + + ScanCommandMessage.class.getCanonicalName()); // the scan command message processor should be called once by the persistent message handler verify(scanCommandMessageProcessor, times(1)).processMessage(any()); From 5fec064021840e40f06eb49b25d628008049d205 Mon Sep 17 00:00:00 2001 From: mynecker Date: Wed, 23 Oct 2024 17:35:13 +0200 Subject: [PATCH 18/20] added DefaultSolacePersistentMessageHandlerObserver to avoid redundant null checks --- ...olacePersistentMessageHandlerObserver.java | 31 +++++++++++++++++++ .../SolacePersistentMessageHandler.java | 29 ++++++++--------- 2 files changed, 44 insertions(+), 16 deletions(-) create mode 100644 service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/DefaultSolacePersistentMessageHandlerObserver.java diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/DefaultSolacePersistentMessageHandlerObserver.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/DefaultSolacePersistentMessageHandlerObserver.java new file mode 100644 index 000000000..da2bb32a0 --- /dev/null +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/DefaultSolacePersistentMessageHandlerObserver.java @@ -0,0 +1,31 @@ +package com.solace.maas.ep.event.management.agent.subscriber; + +import com.solace.messaging.receiver.InboundMessage; + +public class DefaultSolacePersistentMessageHandlerObserver implements SolacePersistentMessageHandlerObserver { + + @Override + public void onMessageReceived(InboundMessage message) { + // Do nothing + } + + @Override + public void onMessageProcessingInitiated(InboundMessage message) { + // Do nothing + } + + @Override + public void onMessageProcessingCompleted(InboundMessage message) { + // Do nothing + } + + @Override + public void onMessageProcessingAcknowledged(InboundMessage message) { + // Do nothing + } + + @Override + public void onMessageProcessingFailed(InboundMessage message) { + // Do nothing + } +} diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java index 3acd56ae5..a34f5d8ba 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java @@ -41,7 +41,6 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp private PersistentMessageReceiver persistentMessageReceiver; // only used for testing - @Setter private SolacePersistentMessageHandlerObserver messageHandlerObserver; protected SolacePersistentMessageHandler(MessagingService messagingService, @@ -49,6 +48,7 @@ protected SolacePersistentMessageHandler(MessagingService messagingService, List messageProcessorList) { super(); + this.messageHandlerObserver = new DefaultSolacePersistentMessageHandlerObserver(); this.messagingService = messagingService; this.eventPortalProperties = eventPortalProperties; messageProcessorsByClassType = messageProcessorList.stream() @@ -63,19 +63,22 @@ protected SolacePersistentMessageHandler(MessagingService messagingService, } + public void setMessageHandlerObserver(SolacePersistentMessageHandlerObserver observer) { + this.messageHandlerObserver = observer; + if (this.messageHandlerObserver==null){ + this.messageHandlerObserver = new DefaultSolacePersistentMessageHandlerObserver(); + } + } + @Override public void onMessage(InboundMessage inboundMessage) { - if (messageHandlerObserver != null) { - messageHandlerObserver.onMessageReceived(inboundMessage); - } + messageHandlerObserver.onMessageReceived(inboundMessage); executor.submit(() -> processMessage(inboundMessage)); } private void processMessage(InboundMessage inboundMessage) { - if (messageHandlerObserver != null) { - messageHandlerObserver.onMessageProcessingInitiated(inboundMessage); - } + messageHandlerObserver.onMessageProcessingInitiated(inboundMessage); String mopMessageSubclass = ""; MessageProcessor processor = null; Object message = null; @@ -91,19 +94,13 @@ private void processMessage(InboundMessage inboundMessage) { log.trace("onMessage: {}\n{}", messageClass, messageAsString); message = toMessage(messageAsString, messageClass); processor.processMessage(processor.castToMessageClass(message)); - if (messageHandlerObserver != null) { - messageHandlerObserver.onMessageProcessingCompleted(inboundMessage); - } + messageHandlerObserver.onMessageProcessingCompleted(inboundMessage); } catch (Exception e) { handleProcessingError(mopMessageSubclass, processor, message, e); - if (messageHandlerObserver != null) { - messageHandlerObserver.onMessageProcessingFailed(inboundMessage); - } + messageHandlerObserver.onMessageProcessingFailed(inboundMessage); } finally { acknowledgeMessage(inboundMessage); - if (messageHandlerObserver != null) { - messageHandlerObserver.onMessageProcessingAcknowledged(inboundMessage); - } + messageHandlerObserver.onMessageProcessingAcknowledged(inboundMessage); } } From b8b4270dbb185c95def5045d3fb3258a50f1f328 Mon Sep 17 00:00:00 2001 From: mynecker Date: Wed, 23 Oct 2024 17:35:28 +0200 Subject: [PATCH 19/20] added DefaultSolacePersistentMessageHandlerObserver to avoid redundant null checks --- .../agent/subscriber/SolacePersistentMessageHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java index a34f5d8ba..926ed26a5 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java @@ -10,7 +10,6 @@ import com.solace.messaging.receiver.PersistentMessageReceiver; import com.solace.messaging.resources.Queue; import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; From b451b515bc92ac8358f978f669025611e3ee19c9 Mon Sep 17 00:00:00 2001 From: mynecker Date: Thu, 24 Oct 2024 13:50:49 +0200 Subject: [PATCH 20/20] refactored observer for SolacePersistentMessageHandler --- ...olacePersistentMessageHandlerObserver.java | 31 -------------- ...PersistentMessageHandlerObserverPhase.java | 9 ++++ .../SolacePersistentMessageHandler.java | 20 ++++----- ...olacePersistentMessageHandlerObserver.java | 6 +-- .../ScanJobPersistentMessageHandlerTests.java | 26 ++++++++++++ ...olacePersistentMessageHandlerObserver.java | 42 +++++++++---------- 6 files changed, 66 insertions(+), 68 deletions(-) delete mode 100644 service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/DefaultSolacePersistentMessageHandlerObserver.java create mode 100644 service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerObserverPhase.java diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/DefaultSolacePersistentMessageHandlerObserver.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/DefaultSolacePersistentMessageHandlerObserver.java deleted file mode 100644 index da2bb32a0..000000000 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/DefaultSolacePersistentMessageHandlerObserver.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.solace.maas.ep.event.management.agent.subscriber; - -import com.solace.messaging.receiver.InboundMessage; - -public class DefaultSolacePersistentMessageHandlerObserver implements SolacePersistentMessageHandlerObserver { - - @Override - public void onMessageReceived(InboundMessage message) { - // Do nothing - } - - @Override - public void onMessageProcessingInitiated(InboundMessage message) { - // Do nothing - } - - @Override - public void onMessageProcessingCompleted(InboundMessage message) { - // Do nothing - } - - @Override - public void onMessageProcessingAcknowledged(InboundMessage message) { - // Do nothing - } - - @Override - public void onMessageProcessingFailed(InboundMessage message) { - // Do nothing - } -} diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerObserverPhase.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerObserverPhase.java new file mode 100644 index 000000000..28eb3a9fd --- /dev/null +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/PersistentMessageHandlerObserverPhase.java @@ -0,0 +1,9 @@ +package com.solace.maas.ep.event.management.agent.subscriber; + +public enum PersistentMessageHandlerObserverPhase { + RECEIVED, + INITIATED, + COMPLETED, + ACKNOWLEDGED, + FAILED +} diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java index 926ed26a5..83b9e9aa4 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandler.java @@ -10,6 +10,7 @@ import com.solace.messaging.receiver.PersistentMessageReceiver; import com.solace.messaging.resources.Queue; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; @@ -40,6 +41,7 @@ public class SolacePersistentMessageHandler extends BaseSolaceMessageHandler imp private PersistentMessageReceiver persistentMessageReceiver; // only used for testing + @Setter private SolacePersistentMessageHandlerObserver messageHandlerObserver; protected SolacePersistentMessageHandler(MessagingService messagingService, @@ -47,7 +49,6 @@ protected SolacePersistentMessageHandler(MessagingService messagingService, List messageProcessorList) { super(); - this.messageHandlerObserver = new DefaultSolacePersistentMessageHandlerObserver(); this.messagingService = messagingService; this.eventPortalProperties = eventPortalProperties; messageProcessorsByClassType = messageProcessorList.stream() @@ -62,22 +63,21 @@ protected SolacePersistentMessageHandler(MessagingService messagingService, } - public void setMessageHandlerObserver(SolacePersistentMessageHandlerObserver observer) { - this.messageHandlerObserver = observer; - if (this.messageHandlerObserver==null){ - this.messageHandlerObserver = new DefaultSolacePersistentMessageHandlerObserver(); + private void notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase phase, InboundMessage inboundMessage) { + if (messageHandlerObserver != null) { + messageHandlerObserver.onPhaseChange(inboundMessage, phase); } } @Override public void onMessage(InboundMessage inboundMessage) { - messageHandlerObserver.onMessageReceived(inboundMessage); + notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.RECEIVED,inboundMessage); executor.submit(() -> processMessage(inboundMessage)); } private void processMessage(InboundMessage inboundMessage) { - messageHandlerObserver.onMessageProcessingInitiated(inboundMessage); + notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.INITIATED,inboundMessage); String mopMessageSubclass = ""; MessageProcessor processor = null; Object message = null; @@ -93,13 +93,13 @@ private void processMessage(InboundMessage inboundMessage) { log.trace("onMessage: {}\n{}", messageClass, messageAsString); message = toMessage(messageAsString, messageClass); processor.processMessage(processor.castToMessageClass(message)); - messageHandlerObserver.onMessageProcessingCompleted(inboundMessage); + notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.COMPLETED,inboundMessage); } catch (Exception e) { handleProcessingError(mopMessageSubclass, processor, message, e); - messageHandlerObserver.onMessageProcessingFailed(inboundMessage); + notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.FAILED,inboundMessage); } finally { acknowledgeMessage(inboundMessage); - messageHandlerObserver.onMessageProcessingAcknowledged(inboundMessage); + notifyPersistentMessageHandlerObserver(PersistentMessageHandlerObserverPhase.ACKNOWLEDGED,inboundMessage); } } diff --git a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java index 499bb3e9d..80ab07e0b 100644 --- a/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java +++ b/service/application/src/main/java/com/solace/maas/ep/event/management/agent/subscriber/SolacePersistentMessageHandlerObserver.java @@ -10,10 +10,6 @@ */ public interface SolacePersistentMessageHandlerObserver { - void onMessageReceived(InboundMessage message); - void onMessageProcessingInitiated(InboundMessage message); - void onMessageProcessingCompleted(InboundMessage message); - void onMessageProcessingAcknowledged(InboundMessage message); - void onMessageProcessingFailed(InboundMessage message); + void onPhaseChange(InboundMessage message, PersistentMessageHandlerObserverPhase phase); } diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java index a369d89c0..377352f1a 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/ScanJobPersistentMessageHandlerTests.java @@ -95,6 +95,32 @@ void setup() { solacePersistentMessageHandler.setMessageHandlerObserver(messageHandlerObserver); } + // Test that the message handler is able to process a scan command message without an observer, + // which will be the case when EMA is executed and not as unit / it test + @Test + void testPersistentMessageHandlerScanCommandMsgAckedWithoutObserver() { + solacePersistentMessageHandler.setMessageHandlerObserver(null); + ScanCommandMessage scanCommandMessage = + new ScanCommandMessage("messagingServiceId", + "scanId", List.of(SOLACE_ALL), List.of(EVENT_PORTAL)); + when(inboundMessage.getPayloadAsString()).thenReturn(jsonString(scanCommandMessage)); + when(inboundMessage.getProperty(MOPConstants.MOP_MSG_META_DECODER)).thenReturn( + ScanCommandMessage.class.getCanonicalName() + ); + when(scanManager.scan(any())).thenReturn("scanId"); + when(scanManager.isScanComplete("scanId")).thenReturn(true); + solacePersistentMessageHandler.onMessage(inboundMessage); + // we have to wait now for a second as there is no observer being notified + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + verify(scanCommandMessageProcessor, times(1)).processMessage(any()); + + + } + @Test void testPersistentMessageHandlerScanCommandMsgAcked() { ScanCommandMessage scanCommandMessage = diff --git a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java index 2e42b0a2f..85b5b6c53 100644 --- a/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java +++ b/service/application/src/test/java/com/solace/maas/ep/event/management/agent/subscriber/TestingSupportSolacePersistentMessageHandlerObserver.java @@ -15,29 +15,27 @@ public class TestingSupportSolacePersistentMessageHandlerObserver implements Sol private final Set acknowledgedMessages = Collections.synchronizedSet(new HashSet<>()); private final Set failedMessages =Collections.synchronizedSet(new HashSet<>()); - @Override - public void onMessageReceived(InboundMessage message) { - receivedMessages.add(message); - } - - @Override - public void onMessageProcessingInitiated(InboundMessage message) { - initiatedMessages.add(message); - } - - @Override - public void onMessageProcessingCompleted(InboundMessage message) { - completedMessages.add(message); - } - - @Override - public void onMessageProcessingAcknowledged(InboundMessage message) { - acknowledgedMessages.add(message); - } - @Override - public void onMessageProcessingFailed(InboundMessage message) { - failedMessages.add(message); + public void onPhaseChange(InboundMessage message, PersistentMessageHandlerObserverPhase phase) { + switch (phase) { + case RECEIVED: + receivedMessages.add(message); + break; + case INITIATED: + initiatedMessages.add(message); + break; + case COMPLETED: + completedMessages.add(message); + break; + case ACKNOWLEDGED: + acknowledgedMessages.add(message); + break; + case FAILED: + failedMessages.add(message); + break; + default: + break; + } } public boolean hasReceivedMessage(InboundMessage message) {