diff --git a/src/main/java/de/telekom/horizon/comet/service/SubscribedEventMessageHandler.java b/src/main/java/de/telekom/horizon/comet/service/SubscribedEventMessageHandler.java index 5fca9a5..c2305e7 100644 --- a/src/main/java/de/telekom/horizon/comet/service/SubscribedEventMessageHandler.java +++ b/src/main/java/de/telekom/horizon/comet/service/SubscribedEventMessageHandler.java @@ -85,7 +85,7 @@ public SubscribedEventMessageHandler(HorizonTracer tracer, * @throws JsonProcessingException If there is an error processing the JSON. */ public CompletableFuture> handleMessage(ConsumerRecord consumerRecord) throws JsonProcessingException { - log.warn("Start handling message with id {}", consumerRecord.key()); + log.debug("Start handling message with id {}", consumerRecord.key()); var subscriptionEventMessage = objectMapper.readValue(consumerRecord.value(), SubscriptionEventMessage.class); DeliveryType deliveryType = subscriptionEventMessage.getDeliveryType(); @@ -99,7 +99,6 @@ public CompletableFuture> handleMessage(ConsumerRecor var rootSpan = tracer.startSpanFromKafkaHeaders("consume subscribed message", consumerRecord.headers()); var rootSpanInScope = tracer.withSpanInScope(rootSpan); // try-with-resources not possible because scope closes after try -> we need context in catch - log.warn("Start to handle Event with id {}", subscriptionEventMessage.getSubscriptionId()); afterStatusSendFuture = handleEvent(subscriptionEventMessage, rootSpan, clientId); log.debug("Finished handling message with id {}", consumerRecord.key()); @@ -119,26 +118,25 @@ public CompletableFuture> handleMessage(ConsumerRecor * @return CompletableFuture with SendResult based on event handling outcome. */ public CompletableFuture> handleEvent(SubscriptionEventMessage subscriptionEventMessage, Span rootSpan, HorizonComponentId messageSource) { - log.warn("Check circuitBreaker for subscriptionId {}", subscriptionEventMessage.getSubscriptionId()); + log.debug("Check circuitBreaker for subscriptionId {}", subscriptionEventMessage.getSubscriptionId()); if (isCircuitBreakerOpenOrChecking(subscriptionEventMessage)) { rootSpan.annotate("Circuit Breaker open! Set event on WAITING"); return stateService.updateState(Status.WAITING, subscriptionEventMessage, null); } - log.warn("Check deduplication for subscriptionId {}", subscriptionEventMessage.getSubscriptionId()); + log.debug("Check deduplication for subscriptionId {}", subscriptionEventMessage.getSubscriptionId()); try { String msgUuidOrNull = deDuplicationService.get(subscriptionEventMessage); - log.warn("Deduplication check for subscriptionId {} returned {}", subscriptionEventMessage.getSubscriptionId(), msgUuidOrNull); + log.debug("Deduplication check for subscriptionId {} returned {}", subscriptionEventMessage.getSubscriptionId(), msgUuidOrNull); boolean isDuplicate = Objects.nonNull(msgUuidOrNull); if (isDuplicate) { - log.warn("Event with id {} is a duplicate. Check if it is the same event.", subscriptionEventMessage.getUuid()); + log.debug("Event with id {} is a duplicate. Check if it is the same event.", subscriptionEventMessage.getUuid()); // circuit breaker is not open AND event is a duplicate return handleDuplicateEvent(subscriptionEventMessage, msgUuidOrNull); } } catch (HazelcastInstanceNotActiveException ex) { - log.warn("HazelcastInstanceNotActiveException occurred while checking for duplicate event with uuid {}. Event will be delivered anyways.", subscriptionEventMessage.getUuid(), ex); log.error("HazelcastInstanceNotActiveException occurred while checking for duplicate event with uuid {}. Event will be delivered anyways.", subscriptionEventMessage.getUuid(), ex); rootSpan.annotate("HazelcastInstanceNotActiveException occurred while checking for duplicate event. Event will be delivered anyways."); rootSpan.error(ex); @@ -167,7 +165,7 @@ private boolean isCircuitBreakerOpenOrChecking(SubscriptionEventMessage subscrip * @return CompletableFuture for the DELIVERING status sending */ private CompletableFuture> deliverEvent(SubscriptionEventMessage subscriptionEventMessage, HorizonComponentId clientId){ - log.warn("Set event to DELIVERING and start delivery for subscriptionId {}", subscriptionEventMessage.getSubscriptionId()); + log.debug("Set event to DELIVERING and start delivery for subscriptionId {}", subscriptionEventMessage.getSubscriptionId()); CompletableFuture> afterStatusSendFuture = stateService.updateState(Status.DELIVERING, subscriptionEventMessage, null); cometMetrics.recordE2eEventLatencyAndExtendMetadata(subscriptionEventMessage, MetricNames.EndToEndLatencyTardis, clientId); deliveryService.deliver(subscriptionEventMessage, clientId); // Starts async task in pool