Skip to content

Commit

Permalink
Merge pull request #1 from TharmiganK/sync_improve
Browse files Browse the repository at this point in the history
Remove unwanted yield and run blocks
  • Loading branch information
warunalakshitha authored Jan 23, 2025
2 parents 82152a0 + 3c376a7 commit 6e5c05c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,17 @@ public class HttpResponseInterceptorUnitCallback extends HttpCallableUnitCallbac
private final BObject response;
private final Environment environment;
private final BObject requestCtx;
private final DataContext dataContext;
private final boolean possibleLastInterceptor;


public HttpResponseInterceptorUnitCallback(HttpCarbonMessage requestMessage, BObject caller, BObject response,
Environment env, DataContext dataContext, Runtime runtime,
boolean possibleLastInterceptor) {
Environment env, Runtime runtime, boolean possibleLastInterceptor) {
super(requestMessage, runtime);
this.requestMessage = requestMessage;
this.requestCtx = (BObject) requestMessage.getProperty(HttpConstants.REQUEST_CONTEXT);
this.caller = caller;
this.response = response;
this.environment = env;
this.dataContext = dataContext;
this.possibleLastInterceptor = possibleLastInterceptor;
}

Expand Down Expand Up @@ -89,7 +86,7 @@ public void invokeErrorInterceptors(BError error, boolean isInternalError) {
}

private void sendResponseToNextService() {
Respond.nativeRespondWithDataCtx(environment, caller, response, dataContext);
Respond.nativeRespond(environment, caller, response);
}

private boolean alreadyResponded() {
Expand All @@ -107,7 +104,6 @@ private void validateResponseAndProceed(Object result) {
BArray interceptors = (BArray) requestCtx.getNativeData(HttpConstants.INTERCEPTORS);

if (alreadyResponded()) {
dataContext.notifyOutboundResponseStatus(null);
stopObserverContext();
return;
}
Expand Down Expand Up @@ -158,9 +154,7 @@ public void invokeBalMethod(Object[] paramFeed, String methodName) {
StrandMetadata metaData = new StrandMetadata(true, null);
this.getRuntime().callMethod(caller, methodName, metaData, paramFeed);
stopObserverContext();
dataContext.notifyOutboundResponseStatus(null);
} catch (BError error) {
dataContext.notifyOutboundResponseStatus(null);
sendFailureResponse(error);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,27 @@ public static Object nativeRespondError(Environment env, BObject connectionObj,
BError error) {
HttpCarbonMessage inboundRequest = HttpUtil.getCarbonMsg(connectionObj, null);
inboundRequest.setProperty(HttpConstants.INTERCEPTOR_SERVICE_ERROR, error);
return env.yieldAndRun(() -> {
CompletableFuture<Object> balFuture = new CompletableFuture<>();
nativeRespondWithDataCtx(env, connectionObj, outboundResponseObj, new DataContext(env, balFuture,
inboundRequest));
return getResult(balFuture);
});
return nativeRespond(env, connectionObj, outboundResponseObj);
}

public static Object nativeRespond(Environment env, BObject connectionObj, BObject outboundResponseObj) {
return env.yieldAndRun(() -> {
CompletableFuture<Object> balFuture = new CompletableFuture<>();
nativeRespondWithDataCtx(env, connectionObj, outboundResponseObj, new DataContext(env,
balFuture, HttpUtil.getCarbonMsg(connectionObj, null)));
return getResult(balFuture);
});
}

public static Object nativeRespondWithDataCtx(Environment env, BObject connectionObj, BObject outboundResponseObj,
DataContext dataContext) {
HttpCarbonMessage inboundRequestMsg = HttpUtil.getCarbonMsg(connectionObj, null);
if (invokeResponseInterceptor(env, inboundRequestMsg, outboundResponseObj, connectionObj, dataContext)) {
if (invokeResponseInterceptor(env, inboundRequestMsg, outboundResponseObj, connectionObj)) {
return null;
}
return nativeRespondInternal(env, connectionObj, outboundResponseObj, inboundRequestMsg);
}

public static Object nativeRespondInternal(Environment env, BObject connectionObj, BObject outboundResponseObj,
HttpCarbonMessage inboundRequestMsg) {
if (isDirtyResponse(outboundResponseObj)) {
String errorMessage = "Couldn't complete the respond operation as the response has been already used.";
HttpUtil.sendOutboundResponse(inboundRequestMsg, HttpUtil.createErrorMessage(errorMessage, 500));
if (log.isDebugEnabled()) {
log.debug("Couldn't complete the respond operation for the sequence id of the request: {} " +
"as the response has been already used.", inboundRequestMsg.getSequenceId());
}
BError httpError = HttpUtil.createHttpError(errorMessage, HttpErrorType.GENERIC_LISTENER_ERROR);
dataContext.getFuture().complete(httpError);
return null;
return HttpUtil.createHttpError(errorMessage, HttpErrorType.GENERIC_LISTENER_ERROR);
}
outboundResponseObj.addNativeData(HttpConstants.DIRTY_RESPONSE, true);
HttpCarbonMessage outboundResponseMsg = HttpUtil.getCarbonMsg(outboundResponseObj, HttpUtil.
Expand All @@ -114,15 +102,14 @@ public static Object nativeRespondWithDataCtx(Environment env, BObject connectio
HttpUtil.checkFunctionValidity(inboundRequestMsg, outboundResponseMsg);
} catch (BError e) {
log.debug(e.getPrintableStackTrace(), e);
dataContext.getFuture().complete(e);
return null;
return e;
}

// Based on https://tools.ietf.org/html/rfc7232#section-4.1
if (CacheUtils.isValidCachedResponse(outboundResponseMsg, inboundRequestMsg)) {
outboundResponseMsg.setHttpStatusCode(HttpResponseStatus.NOT_MODIFIED.code());
outboundResponseMsg.setProperty(HttpConstants.HTTP_REASON_PHRASE,
HttpResponseStatus.NOT_MODIFIED.reasonPhrase());
HttpResponseStatus.NOT_MODIFIED.reasonPhrase());
outboundResponseMsg.removeHeader(HttpHeaderNames.CONTENT_LENGTH.toString());
outboundResponseMsg.removeHeader(HttpHeaderNames.CONTENT_TYPE.toString());
outboundResponseMsg.waitAndReleaseAllEntities();
Expand All @@ -143,31 +130,35 @@ public static Object nativeRespondWithDataCtx(Environment env, BObject connectio
observerContext.addProperty(PROPERTY_KEY_HTTP_STATUS_CODE, statusCode);
}
}
try {
if (pipeliningRequired(inboundRequestMsg)) {
if (log.isDebugEnabled()) {
log.debug("Pipelining is required. Sequence id of the request: {}",
inboundRequestMsg.getSequenceId());

return env.yieldAndRun(() -> {
try {
CompletableFuture<Object> balFuture = new CompletableFuture<>();
DataContext dataContext = new DataContext(env, balFuture, HttpUtil.getCarbonMsg(connectionObj, null));
if (pipeliningRequired(inboundRequestMsg)) {
if (log.isDebugEnabled()) {
log.debug("Pipelining is required. Sequence id of the request: {}",
inboundRequestMsg.getSequenceId());
}
PipelinedResponse pipelinedResponse = new PipelinedResponse(inboundRequestMsg, outboundResponseMsg,
dataContext, outboundResponseObj);
setPipeliningListener(outboundResponseMsg);
executePipeliningLogic(inboundRequestMsg.getSourceContext(), pipelinedResponse);
} else {
sendOutboundResponseRobust(dataContext, inboundRequestMsg, outboundResponseObj,
outboundResponseMsg);
}
PipelinedResponse pipelinedResponse = new PipelinedResponse(inboundRequestMsg, outboundResponseMsg,
dataContext, outboundResponseObj);
setPipeliningListener(outboundResponseMsg);
executePipeliningLogic(inboundRequestMsg.getSourceContext(), pipelinedResponse);
} else {
sendOutboundResponseRobust(dataContext, inboundRequestMsg, outboundResponseObj, outboundResponseMsg);
return getResult(balFuture);
} catch (BError e) {
log.debug(e.getPrintableStackTrace(), e);
return HttpUtil.createHttpError(e.getMessage(), HttpErrorType.GENERIC_LISTENER_ERROR);
} catch (Throwable e) {
//Exception is already notified by http transport.
String errorMessage = "Couldn't complete outbound response: " + e.getMessage();
log.debug(errorMessage, e);
return HttpUtil.createHttpError(errorMessage, HttpErrorType.GENERIC_LISTENER_ERROR);
}
} catch (BError e) {
log.debug(e.getPrintableStackTrace(), e);
dataContext.getFuture().complete(
HttpUtil.createHttpError(e.getMessage(), HttpErrorType.GENERIC_LISTENER_ERROR));
} catch (Throwable e) {
//Exception is already notified by http transport.
String errorMessage = "Couldn't complete outbound response: " + e.getMessage();
log.debug(errorMessage, e);
dataContext.getFuture().complete(
HttpUtil.createHttpError(errorMessage, HttpErrorType.GENERIC_LISTENER_ERROR));
}
return null;
});
}

private static void setCacheControlHeader(BObject outboundRespObj, HttpCarbonMessage outboundResponse) {
Expand All @@ -187,8 +178,7 @@ private static boolean isDirtyResponse(BObject outboundResponseObj) {
private Respond() {}

public static boolean invokeResponseInterceptor(Environment env, HttpCarbonMessage inboundMessage,
BObject outboundResponseObj, BObject callerObj,
DataContext dataContext) {
BObject outboundResponseObj, BObject callerObj) {
List<HTTPInterceptorServicesRegistry> interceptorServicesRegistries =
(List<HTTPInterceptorServicesRegistry>) inboundMessage.getProperty(INTERCEPTOR_SERVICES_REGISTRIES);
if (interceptorServicesRegistries.isEmpty()) {
Expand Down Expand Up @@ -221,7 +211,7 @@ public static boolean invokeResponseInterceptor(Environment env, HttpCarbonMessa
interceptorServiceIndex -= 1;
inboundMessage.setProperty(HttpConstants.RESPONSE_INTERCEPTOR_INDEX, interceptorServiceIndex);
startInterceptResponseMethod(inboundMessage, outboundResponseObj, callerObj, service, env,
interceptorServicesRegistry, dataContext);
interceptorServicesRegistry);
return true;
} catch (Exception e) {
throw HttpUtil.createHttpError(e.getMessage(), HttpErrorType.GENERIC_LISTENER_ERROR);
Expand All @@ -230,7 +220,7 @@ public static boolean invokeResponseInterceptor(Environment env, HttpCarbonMessa
// Handling error panics
if (inboundMessage.isInterceptorError()) {
HttpResponseInterceptorUnitCallback callback = new HttpResponseInterceptorUnitCallback(inboundMessage,
callerObj, outboundResponseObj, env, dataContext, null, false);
callerObj, outboundResponseObj, env, null, false);
callback.sendFailureResponse((BError) inboundMessage.getProperty(HttpConstants.INTERCEPTOR_SERVICE_ERROR));
}
return false;
Expand All @@ -248,15 +238,13 @@ private static int getResponseInterceptorIndex(HttpCarbonMessage inboundMessage,

private static void startInterceptResponseMethod(HttpCarbonMessage inboundMessage, BObject outboundResponseObj,
BObject callerObj, InterceptorService service, Environment env,
HTTPInterceptorServicesRegistry interceptorServicesRegistry,
DataContext dataContext) {
HTTPInterceptorServicesRegistry interceptorServicesRegistry) {
BObject serviceObj = service.getBalService();
Runtime runtime = interceptorServicesRegistry.getRuntime();
Object[] signatureParams = HttpDispatcher.getRemoteSignatureParameters(service, outboundResponseObj, callerObj,
inboundMessage, runtime);
HttpResponseInterceptorUnitCallback callback = new HttpResponseInterceptorUnitCallback(inboundMessage,
callerObj, outboundResponseObj,
env, dataContext, runtime, interceptorServicesRegistry.isPossibleLastInterceptor());
callerObj, outboundResponseObj, env, runtime, interceptorServicesRegistry.isPossibleLastInterceptor());

inboundMessage.removeProperty(HttpConstants.INTERCEPTOR_SERVICE_ERROR);
String methodName = service.getServiceType().equals(HttpConstants.RESPONSE_ERROR_INTERCEPTOR)
Expand Down

0 comments on commit 6e5c05c

Please sign in to comment.