Skip to content

Commit

Permalink
feature: add callbackUrlCache, remove old code for retryableStatusCod…
Browse files Browse the repository at this point in the history
…es and add these code to right place
  • Loading branch information
julian-spierefka committed Jun 28, 2024
1 parent c116fc8 commit 0d3bb6e
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 12 deletions.
12 changes: 3 additions & 9 deletions src/main/java/de/telekom/horizon/comet/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void callback(SubscriptionEventMessage subscriptionEventMessage, String c
request.setEntity(payload);

// throws IOException
executeRequest(callbackUrl, request, subscriptionEventMessage);
executeRequest(callbackUrl, request);
}

/**
Expand All @@ -134,19 +134,13 @@ public void callback(SubscriptionEventMessage subscriptionEventMessage, String c
* @throws IOException If an IO error occurs during the HTTP request.
* @throws CallbackException If the response status code is not acceptable.
*/
private void executeRequest(String callbackUrl, HttpPost request, SubscriptionEventMessage subscriptionEventMessage) throws IOException, CallbackException {
private void executeRequest(String callbackUrl, HttpPost request) throws IOException, CallbackException {
try (var response = httpClient.execute(request)) {
// Compare response status code with acceptable status codes from config
var statusCode = response.getStatusLine().getStatusCode();
var successfulStatusCodes = cometConfig.getSuccessfulStatusCodes();

var retryableStatusCodesOptional = callbackUrlCache.
getDeliveryTargetInformation(subscriptionEventMessage.getSubscriptionId()).
map(DeliveryTargetInformation::getRetryableStatusCodes);

var statusCodesToCheck = retryableStatusCodesOptional.orElse(successfulStatusCodes);

if (!statusCodesToCheck.contains(statusCode)) {
if (!successfulStatusCodes.contains(statusCode)) {
throw new CallbackException(String.format("Error while delivering event to callback '%s': %s", callbackUrl, response.getStatusLine().getReasonPhrase()), statusCode);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import brave.internal.Nullable;
import de.telekom.eni.pandora.horizon.model.event.SubscriptionEventMessage;
import de.telekom.eni.pandora.horizon.model.meta.HorizonComponentId;
import de.telekom.horizon.comet.cache.CallbackUrlCache;
import de.telekom.horizon.comet.service.DeliveryResultListener;
import de.telekom.horizon.comet.service.DeliveryTaskFactory;
import org.springframework.context.ApplicationContext;
Expand All @@ -19,6 +20,7 @@ public record DeliveryTaskRecord (
int retryCount,
DeliveryResultListener deliveryResultListener,
DeliveryTaskFactory deliveryTaskFactory,
CallbackUrlCache callbackUrlCache,
@Nullable Span deliverySpan,
HorizonComponentId messageSource,
ApplicationContext context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void deliver(SubscriptionEventMessage subscriptionEventMessage, HorizonCo
var callbackUrlOrEmptyStr = getCallbackUrlOrEmptyStr(subscriptionEventMessage);
// if callbackUrl is empty, we throw an CallbackUrlNotFoundException later, which gets handled

DeliveryTaskRecord deliveryTaskRecord = new DeliveryTaskRecord(subscriptionEventMessage, callbackUrlOrEmptyStr, 0, 0, this, deliveryTaskFactory, null, messageSource, null);
DeliveryTaskRecord deliveryTaskRecord = new DeliveryTaskRecord(subscriptionEventMessage, callbackUrlOrEmptyStr, 0, 0, this, deliveryTaskFactory, callbackUrlCache, null, messageSource, null);
var deliveryTask = deliveryTaskFactory.createNew(deliveryTaskRecord);

// Do not wait for thread
Expand Down Expand Up @@ -271,7 +271,7 @@ private boolean tryToRedeliver(DeliveryResult deliveryResult) {
eventUuid, backoffInterval, retryCount, cometConfig.getMaxRetries());


DeliveryTaskRecord deliveryTaskRecord = new DeliveryTaskRecord(subscriptionEventMessage, callbackUrlOrEmptyStr, backoffInterval, retryCount, this, deliveryTaskFactory, deliverySpan, deliveryResult.messageSource(), null);
DeliveryTaskRecord deliveryTaskRecord = new DeliveryTaskRecord(subscriptionEventMessage, callbackUrlOrEmptyStr, backoffInterval, retryCount, this, deliveryTaskFactory, callbackUrlCache, deliverySpan, deliveryResult.messageSource(), null);
var redeliverTask = deliveryTaskFactory.createNew(deliveryTaskRecord);
redeliveryTaskExecutor.submit(tracer.withCurrentTraceContext(redeliverTask));
return true;
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/de/telekom/horizon/comet/service/DeliveryTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import de.telekom.eni.pandora.horizon.model.event.SubscriptionEventMessage;
import de.telekom.eni.pandora.horizon.model.meta.HorizonComponentId;
import de.telekom.eni.pandora.horizon.tracing.HorizonTracer;
import de.telekom.horizon.comet.cache.CallbackUrlCache;
import de.telekom.horizon.comet.cache.DeliveryTargetInformation;
import de.telekom.horizon.comet.client.RestClient;
import de.telekom.horizon.comet.config.CometConfig;
import de.telekom.horizon.comet.config.CometMetrics;
Expand Down Expand Up @@ -64,6 +66,8 @@ public class DeliveryTask implements Runnable {

private final CircuitBreakerCacheService circuitBreakerCacheService;

private final CallbackUrlCache callbackUrlCache;

private final DeDuplicationService deDuplicationService;

private final DeliveryResultListener deliveryResultListener;
Expand Down Expand Up @@ -92,6 +96,7 @@ public DeliveryTask(DeliveryTaskRecord deliveryTaskRecord) {
this.metricsHelper = deliveryTaskRecord.deliveryTaskFactory().getMetricsHelper();
this.cometMetrics = deliveryTaskRecord.deliveryTaskFactory().getCometMetrics();
this.circuitBreakerCacheService = deliveryTaskRecord.deliveryTaskFactory().getCircuitBreakerCacheService();
this.callbackUrlCache = deliveryTaskRecord.callbackUrlCache();
this.deDuplicationService = deliveryTaskRecord.deliveryTaskFactory().getDeDuplicationService();
this.deliverySpan = deliveryTaskRecord.deliverySpan();
this.messageSource = deliveryTaskRecord.messageSource();
Expand Down Expand Up @@ -245,7 +250,18 @@ private boolean handleCallbackException(CallbackException callbackException) {
var httpCode = callbackException.getStatusCode();
writeHttpCodeMetricTags(httpCode);

boolean shouldRedeliver = cometConfig.getRedeliveryStatusCodes().contains(httpCode);
var retryableStatusCodesOptional = callbackUrlCache
.getDeliveryTargetInformation(subscriptionEventMessage.getSubscriptionId())
.map(DeliveryTargetInformation::getRetryableStatusCodes);

var statusCodesToCheck = retryableStatusCodesOptional.orElse(cometConfig.getRedeliveryStatusCodes());

boolean shouldRedeliver;
if (retryableStatusCodesOptional.isPresent()) {
shouldRedeliver = statusCodesToCheck.contains(httpCode);
} else {
shouldRedeliver = cometConfig.getRedeliveryStatusCodes().contains(httpCode);
}

var exceptionCause = callbackException.getCause();
writeCallbackExceptionInTrace(exceptionCause, httpCode, shouldRedeliver);
Expand Down

0 comments on commit 0d3bb6e

Please sign in to comment.