Skip to content

Commit

Permalink
improve RpcInvokeCallbackListener resolve response
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeCqupt committed Jan 10, 2024
1 parent 829be55 commit dca34c2
Showing 1 changed file with 32 additions and 71 deletions.
103 changes: 32 additions & 71 deletions src/main/java/com/alipay/remoting/rpc/RpcInvokeCallbackListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,101 +107,62 @@ public CallbackTask(String remoteAddress, InvokeFuture future) {
public void run() {
InvokeCallback callback = future.getInvokeCallback();
// a lot of try-catches to protect thread pool
ResponseCommand response = null;

try {
response = (ResponseCommand) future.waitResponse(0);
} catch (InterruptedException e) {
String msg = "Exception caught when getting response from InvokeFuture. The address is "
+ this.remoteAddress;
logger.error(msg, e);
}
if (response == null || response.getResponseStatus() != ResponseStatus.SUCCESS) {
ResponseCommand response = null;

try {
Exception e;
if (response == null) {
e = new InvokeException("Exception caught in invocation. The address is "
+ this.remoteAddress + " responseStatus:"
+ ResponseStatus.UNKNOWN, future.getCause());
} else {
response.setInvokeContext(future.getInvokeContext());
switch (response.getResponseStatus()) {
case TIMEOUT:
e = new InvokeTimeoutException(
"Invoke timeout when invoke with callback.The address is "
+ this.remoteAddress);
break;
case CONNECTION_CLOSED:
e = new ConnectionClosedException(
"Connection closed when invoke with callback.The address is "
+ this.remoteAddress);
break;
case SERVER_THREADPOOL_BUSY:
e = new InvokeServerBusyException(
"Server thread pool busy when invoke with callback.The address is "
+ this.remoteAddress);
break;
case SERVER_EXCEPTION:
String msg = "Server exception when invoke with callback.Please check the server log! The address is "
+ this.remoteAddress;
RpcResponseCommand resp = (RpcResponseCommand) response;
resp.deserialize();
Object ex = resp.getResponseObject();
if (ex instanceof Throwable) {
e = new InvokeServerException(msg, (Throwable) ex);
} else {
e = new InvokeServerException(msg);
}
break;
default:
e = new InvokeException(
"Exception caught in invocation. The address is "
+ this.remoteAddress + " responseStatus:"
+ response.getResponseStatus(), future.getCause());
response = (ResponseCommand) future.waitResponse(0);
} catch (InterruptedException e) {
String msg = "Exception caught when getting response from InvokeFuture. The address is "
+ this.remoteAddress;
logger.error(msg, e);
throw new InvokeException(msg, e);
}

}
}
callback.onException(e);
} catch (Throwable e) {
logger
.error(
"Exception occurred in user defined InvokeCallback#onException() logic, The address is {}",
this.remoteAddress, e);
if (response == null) {
throw new InvokeException("Exception caught in invocation. The address is "
+ this.remoteAddress + " responseStatus:"
+ ResponseStatus.UNKNOWN, future.getCause());
}
} else {

Object responseObj = RpcResponseResolver.resolveResponseObject(response,
this.remoteAddress);

ClassLoader oldClassLoader = null;
try {
if (future.getAppClassLoader() != null) {
oldClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(future.getAppClassLoader());
}
response.setInvokeContext(future.getInvokeContext());
RpcResponseCommand rpcResponse = (RpcResponseCommand) response;
response.deserialize();
try {
callback.onResponse(rpcResponse.getResponseObject());
callback.onResponse(responseObj);
} catch (Throwable e) {
logger
.error(
"Exception occurred in user defined InvokeCallback#onResponse() logic.",
e);
}
} catch (CodecException e) {
logger
.error(
"CodecException caught on when deserialize response in RpcInvokeCallbackListener. The address is {}.",
this.remoteAddress, e);
} catch (Throwable e) {
logger.error(
"Exception caught in RpcInvokeCallbackListener. The address is {}",
this.remoteAddress, e);

} finally {
if (oldClassLoader != null) {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
} // enf of else
} // end of run

} catch (Throwable t) {
try {
callback.onException(t);
} catch (Throwable te) {
logger
.error(
"Exception occurred in user defined InvokeCallback#onException() logic, The address is {}",
this.remoteAddress, te);
}
}

}
}

/**
Expand Down

0 comments on commit dca34c2

Please sign in to comment.