From dca34c24621e0c7aae71265fcaec3fa07e25eb0f Mon Sep 17 00:00:00 2001 From: jiangyuan04 Date: Wed, 10 Jan 2024 21:28:56 +0800 Subject: [PATCH] improve RpcInvokeCallbackListener resolve response --- .../rpc/RpcInvokeCallbackListener.java | 103 ++++++------------ 1 file changed, 32 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/alipay/remoting/rpc/RpcInvokeCallbackListener.java b/src/main/java/com/alipay/remoting/rpc/RpcInvokeCallbackListener.java index 424b927c..2a274f31 100644 --- a/src/main/java/com/alipay/remoting/rpc/RpcInvokeCallbackListener.java +++ b/src/main/java/com/alipay/remoting/rpc/RpcInvokeCallbackListener.java @@ -107,68 +107,28 @@ public CallbackTask(String remoteAddress, InvokeFuture future) { public void run() { InvokeCallback callback = future.getInvokeCallback(); // a lot of try-catches to protect thread pool - ResponseCommand response = null; try { - response = (ResponseCommand) future.waitResponse(0); - } catch (InterruptedException e) { - String msg = "Exception caught when getting response from InvokeFuture. The address is " - + this.remoteAddress; - logger.error(msg, e); - } - if (response == null || response.getResponseStatus() != ResponseStatus.SUCCESS) { + ResponseCommand response = null; + try { - Exception e; - if (response == null) { - e = new InvokeException("Exception caught in invocation. The address is " - + this.remoteAddress + " responseStatus:" - + ResponseStatus.UNKNOWN, future.getCause()); - } else { - response.setInvokeContext(future.getInvokeContext()); - switch (response.getResponseStatus()) { - case TIMEOUT: - e = new InvokeTimeoutException( - "Invoke timeout when invoke with callback.The address is " - + this.remoteAddress); - break; - case CONNECTION_CLOSED: - e = new ConnectionClosedException( - "Connection closed when invoke with callback.The address is " - + this.remoteAddress); - break; - case SERVER_THREADPOOL_BUSY: - e = new InvokeServerBusyException( - "Server thread pool busy when invoke with callback.The address is " - + this.remoteAddress); - break; - case SERVER_EXCEPTION: - String msg = "Server exception when invoke with callback.Please check the server log! The address is " - + this.remoteAddress; - RpcResponseCommand resp = (RpcResponseCommand) response; - resp.deserialize(); - Object ex = resp.getResponseObject(); - if (ex instanceof Throwable) { - e = new InvokeServerException(msg, (Throwable) ex); - } else { - e = new InvokeServerException(msg); - } - break; - default: - e = new InvokeException( - "Exception caught in invocation. The address is " - + this.remoteAddress + " responseStatus:" - + response.getResponseStatus(), future.getCause()); + response = (ResponseCommand) future.waitResponse(0); + } catch (InterruptedException e) { + String msg = "Exception caught when getting response from InvokeFuture. The address is " + + this.remoteAddress; + logger.error(msg, e); + throw new InvokeException(msg, e); + } - } - } - callback.onException(e); - } catch (Throwable e) { - logger - .error( - "Exception occurred in user defined InvokeCallback#onException() logic, The address is {}", - this.remoteAddress, e); + if (response == null) { + throw new InvokeException("Exception caught in invocation. The address is " + + this.remoteAddress + " responseStatus:" + + ResponseStatus.UNKNOWN, future.getCause()); } - } else { + + Object responseObj = RpcResponseResolver.resolveResponseObject(response, + this.remoteAddress); + ClassLoader oldClassLoader = null; try { if (future.getAppClassLoader() != null) { @@ -176,32 +136,33 @@ public void run() { Thread.currentThread().setContextClassLoader(future.getAppClassLoader()); } response.setInvokeContext(future.getInvokeContext()); - RpcResponseCommand rpcResponse = (RpcResponseCommand) response; - response.deserialize(); try { - callback.onResponse(rpcResponse.getResponseObject()); + callback.onResponse(responseObj); } catch (Throwable e) { logger .error( "Exception occurred in user defined InvokeCallback#onResponse() logic.", e); } - } catch (CodecException e) { - logger - .error( - "CodecException caught on when deserialize response in RpcInvokeCallbackListener. The address is {}.", - this.remoteAddress, e); - } catch (Throwable e) { - logger.error( - "Exception caught in RpcInvokeCallbackListener. The address is {}", - this.remoteAddress, e); + } finally { if (oldClassLoader != null) { Thread.currentThread().setContextClassLoader(oldClassLoader); } } - } // enf of else - } // end of run + + } catch (Throwable t) { + try { + callback.onException(t); + } catch (Throwable te) { + logger + .error( + "Exception occurred in user defined InvokeCallback#onException() logic, The address is {}", + this.remoteAddress, te); + } + } + + } } /**