From 55577134c704f61934853ca817cc8887a5b745a5 Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Wed, 29 May 2024 14:25:32 +0800 Subject: [PATCH 1/4] =?UTF-8?q?:bookmark:=20=E5=A2=9E=E5=8A=A0=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E5=BC=95=E6=93=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 + .../common/engine/model/EngineContext.java | 22 +++ .../common/engine/model/EngineRunData.java | 15 ++ .../luna/common/engine/model/NodeChain.java | 88 +++++++++ .../luna/common/engine/model/NodeConf.java | 24 +++ .../luna/common/engine/model/NodeName.java | 55 ++++++ .../engine/task/AbstractEngineExecute.java | 178 ++++++++++++++++++ .../engine/task/AbstractEngineNode.java | 16 ++ .../luna/common/engine/task/EngineNode.java | 43 +++++ .../common/engine/task/NodeParallelTask.java | 59 ++++++ .../engine/task/TradeEngineCallable.java | 19 ++ .../com/luna/common/reflect/ReflectUtils.java | 51 +++++ .../luna/common/spring/SpringBeanService.java | 95 ++++++++++ .../common/thread/CommonThreadPoolUtil.java | 52 +++-- .../com/luna/common/utils/EngineNodeTest.java | 94 +++++++++ .../com/luna/common/utils/RestUtilsTest.java | 6 + 16 files changed, 802 insertions(+), 20 deletions(-) create mode 100644 src/main/java/com/luna/common/engine/model/EngineContext.java create mode 100644 src/main/java/com/luna/common/engine/model/EngineRunData.java create mode 100644 src/main/java/com/luna/common/engine/model/NodeChain.java create mode 100644 src/main/java/com/luna/common/engine/model/NodeConf.java create mode 100644 src/main/java/com/luna/common/engine/model/NodeName.java create mode 100644 src/main/java/com/luna/common/engine/task/AbstractEngineExecute.java create mode 100644 src/main/java/com/luna/common/engine/task/AbstractEngineNode.java create mode 100644 src/main/java/com/luna/common/engine/task/EngineNode.java create mode 100644 src/main/java/com/luna/common/engine/task/NodeParallelTask.java create mode 100644 src/main/java/com/luna/common/engine/task/TradeEngineCallable.java create mode 100644 src/main/java/com/luna/common/spring/SpringBeanService.java create mode 100644 src/test/java/com/luna/common/utils/EngineNodeTest.java diff --git a/pom.xml b/pom.xml index dbd3c3dc4..4cd9e684e 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,11 @@ jaxb-runtime 4.0.5 + + org.springframework + spring-context + 6.1.5 + org.slf4j slf4j-simple diff --git a/src/main/java/com/luna/common/engine/model/EngineContext.java b/src/main/java/com/luna/common/engine/model/EngineContext.java new file mode 100644 index 000000000..e8841ea37 --- /dev/null +++ b/src/main/java/com/luna/common/engine/model/EngineContext.java @@ -0,0 +1,22 @@ +package com.luna.common.engine.model; + +import com.google.common.collect.Maps; +import lombok.Data; + +import java.util.Map; + +@Data +public class EngineContext { + + /** + * adaptor的结果缓存 + */ + private Map adaptorMap = Maps.newConcurrentMap(); + + /** + * engin是否继续执行 + */ + private boolean isStop; + +} \ No newline at end of file diff --git a/src/main/java/com/luna/common/engine/model/EngineRunData.java b/src/main/java/com/luna/common/engine/model/EngineRunData.java new file mode 100644 index 000000000..ec114da60 --- /dev/null +++ b/src/main/java/com/luna/common/engine/model/EngineRunData.java @@ -0,0 +1,15 @@ +package com.luna.common.engine.model; + +import lombok.Data; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author luna + */ +@Data +public class EngineRunData { + + private Map runData = new ConcurrentHashMap<>(); +} \ No newline at end of file diff --git a/src/main/java/com/luna/common/engine/model/NodeChain.java b/src/main/java/com/luna/common/engine/model/NodeChain.java new file mode 100644 index 000000000..0385d6e11 --- /dev/null +++ b/src/main/java/com/luna/common/engine/model/NodeChain.java @@ -0,0 +1,88 @@ +package com.luna.common.engine.model; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.collect.Maps; + +import lombok.Getter; +import lombok.Setter; + +/** + *

+ * detailNode的存储类 + */ +@Getter +@Setter +public class NodeChain { + + private Map nodeMap = Maps.newLinkedHashMap(); + + private static NodeName getNodeName(String groupName, Class nodeClass) { + NodeName nodeName; + if (StringUtils.isNotBlank(groupName)) { + nodeName = new NodeName(groupName, nodeClass.getName()); + } else { + nodeName = new NodeName(null, nodeClass.getName()); + } + return nodeName; + } + + public void add(String groupName, Class nodeClass, NodeConf nodeConf) { + NodeName nodeName = getNodeName(groupName, nodeClass); + add(nodeName, nodeConf); + } + + public void add(Class nodeName, NodeConf nodeConf) { + add(nodeName.getName(), nodeName, nodeConf); + } + + public void add(NodeName nodeName, NodeConf nodeConf) { + if (nodeMap.containsKey(nodeName)) { + return; + } + nodeMap.put(nodeName, nodeConf); + } + + public void replace(String groupName, Class nodeClass, NodeConf nodeConf) { + NodeName nodeName = getNodeName(groupName, nodeClass); + + nodeMap.put(nodeName, nodeConf); + } + + public void replace(NodeName nodeName, NodeConf nodeConf) { + nodeMap.put(nodeName, nodeConf); + } + + public void replace(Class nodeName, NodeConf nodeConf) { + replace(nodeName.getName(), nodeName, nodeConf); + } + + public void remove(Class nodeName) { + remove(nodeName.getName(), nodeName); + } + + public void remove(String groupName, Class nodeClass) { + NodeName nodeName = getNodeName(groupName, nodeClass); + nodeMap.remove(nodeName); + } + + public Set getNodeNameList() { + return getNodeList().stream().map(NodeName::getNodeName).collect(Collectors.toSet()); + } + + public Set getNodeList() { + return nodeMap.keySet(); + } + + public NodeChain deepClone() { + LinkedHashMap cloneMap = new LinkedHashMap<>(nodeMap); + NodeChain nodeChain = new NodeChain(); + nodeChain.setNodeMap(cloneMap); + return nodeChain; + } +} diff --git a/src/main/java/com/luna/common/engine/model/NodeConf.java b/src/main/java/com/luna/common/engine/model/NodeConf.java new file mode 100644 index 000000000..8c5ba6b09 --- /dev/null +++ b/src/main/java/com/luna/common/engine/model/NodeConf.java @@ -0,0 +1,24 @@ +package com.luna.common.engine.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 节点配置 + */ +@AllArgsConstructor +@NoArgsConstructor +@Data +public class NodeConf { + + /** + * 是否是强依赖 + */ + private boolean isStrongRely = true; + /** + * 并行执行超时时间 + * 默认200ms + */ + private int timeout = 200; +} diff --git a/src/main/java/com/luna/common/engine/model/NodeName.java b/src/main/java/com/luna/common/engine/model/NodeName.java new file mode 100644 index 000000000..61f4e4076 --- /dev/null +++ b/src/main/java/com/luna/common/engine/model/NodeName.java @@ -0,0 +1,55 @@ +package com.luna.common.engine.model; + +import com.luna.common.check.Assert; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Objects; + +/** + * 节点Key + */ +@NoArgsConstructor +@Data +public class NodeName { + + /** + * 节点组 + */ + private String groupName; + /** + * 节点名称 + */ + private String nodeName; + + public NodeName(String groupName, String nodeName) { + Assert.notNull(nodeName, "节点名称不能为空"); + this.groupName = groupName; + this.nodeName = nodeName; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + NodeName nodeName1 = (NodeName)o; + return Objects.equals(groupName, nodeName1.groupName) && Objects.equals(nodeName, nodeName1.nodeName); + } + + @Override + public int hashCode() { + return Objects.hash(groupName, nodeName); + } + + @Override + public String toString() { + final StringBuffer sb = new StringBuffer("NodeName{"); + sb.append("groupName='").append(groupName).append('\''); + sb.append(", nodeName='").append(nodeName).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/src/main/java/com/luna/common/engine/task/AbstractEngineExecute.java b/src/main/java/com/luna/common/engine/task/AbstractEngineExecute.java new file mode 100644 index 000000000..f82ea236e --- /dev/null +++ b/src/main/java/com/luna/common/engine/task/AbstractEngineExecute.java @@ -0,0 +1,178 @@ +package com.luna.common.engine.task; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.luna.common.engine.model.*; +import com.luna.common.exception.BaseException; +import com.luna.common.spring.SpringBeanService; +import com.luna.common.thread.CommonThreadPoolUtil; + +import lombok.extern.slf4j.Slf4j; + +/** + * @Description: EngineNode的执行引擎,会串行执行一系列的EngineNode。同一个group的EngineNode会并行执行 + * @Author: + * @Modified by: + */ +@Slf4j +public abstract class AbstractEngineExecute { + + private static CommonThreadPoolUtil COMMON_THREAD_POOL_UTIL = new CommonThreadPoolUtil(); + + /** + * 引擎执行入口 + * + * @param nodeChainChain + * @param engineRunData + * @param engineContext + * @throws Exception + */ + @SneakyThrows + public T execute(NodeChain nodeChainChain, EngineRunData engineRunData, EngineContext engineContext) { + + Map> nodeGroup = groupByGroupName(nodeChainChain); + + Map nodeMap = nodeChainChain.getNodeMap(); + for (String groupName : nodeGroup.keySet()) { + BaseException exp = null; + boolean needThrowExp = false; + List nodeNameList = nodeGroup.get(groupName); + // 只有一个Node的节点,串行执行 + if (nodeNameList.size() == 1) { + NodeName nodeName = nodeNameList.get(0); + + EngineNode detailNode = (EngineNode)SpringBeanService.getSingleBeanByType(Class.forName(nodeName.getNodeName())); + NodeParallelTask nodeParallelTask = new NodeParallelTask(detailNode, engineRunData, engineContext); + try { + Object result = nodeParallelTask.execute(); + engineContext.getAdaptorMap().put(detailNode.resultKey(), result); + } catch (BaseException e) { + needThrowExp = isStrongRely(nodeMap, nodeName); + if (!isStrongRely(nodeMap, nodeName)) { + // 注意:只有非强依赖的节点我们才打印日志,因为强依赖的节点,会把异常抛出终结流程,那么异常一定会在thor层面被捕获并且打印堆栈,会造成异常重复打印 + // 并且由于跑出来的异常是DetailException业务日志,所以我们把异常信息打印到monitor-biz.log中 + log.warn("detailnode occur exception !!!", e); + } + exp = e; + } catch (Exception e) { + needThrowExp = isStrongRely(nodeMap, nodeName); + if (!isStrongRely(nodeMap, nodeName)) { + // 注意:只有非强依赖的节点我们才打印日志,因为强依赖的节点,会把异常抛出终结流程,那么异常一定会在thor层面被捕获并且打印堆栈,会造成异常重复打印 + log.error("detailEngine execute throw unknown Exception 2", e); + } + exp = BaseException.SYSTEM_ERROR; + } + } else { // 多个个Node的组合节点,并行执行 + List resultList = new ArrayList<>(); + List executedNodeNameList = new ArrayList<>(); + List executedNodeList = new ArrayList<>(); + for (NodeName nodeName : nodeNameList) { + EngineNode detailNode = (EngineNode)SpringBeanService.getSingleBeanByType(Class.forName(nodeName.getNodeName())); + if (!detailNode.couldContinueExecute(engineContext)) { + continue; + } + + NodeParallelTask nodeParallelTask = new NodeParallelTask(detailNode, engineRunData, engineContext); + executedNodeList.add(nodeParallelTask); + executedNodeNameList.add(nodeName); + resultList.add(COMMON_THREAD_POOL_UTIL.getThreadPool().submit(nodeParallelTask)); + } + for (int i = 0; i < resultList.size(); i++) { + NodeName nodeName = executedNodeNameList.get(i); + EngineNode detailNode = (EngineNode)SpringBeanService.getSingleBeanByType(Class.forName(nodeName.getNodeName())); + NodeConf nodeConf = nodeMap.get(nodeName); + boolean strongRely = nodeConf.isStrongRely(); + int timeout = nodeConf.getTimeout(); + Future future = resultList.get(i); + try { + Object o = future.get(timeout, TimeUnit.MILLISECONDS); + engineContext.getAdaptorMap().put(detailNode.resultKey(), o); + } catch (ExecutionException e) { + needThrowExp = strongRely; + Throwable cause = e.getCause(); + if (cause instanceof BaseException) { + if (!strongRely) { + // 注意:只有非强依赖的节点我们才打印日志,因为强依赖的节点,会把异常抛出终结流程,那么异常一定会在thor层面被捕获并且打印堆栈,会造成异常重复打印 + // 并且由于跑出来的异常是DetailException业务日志,所以我们把异常信息打印到monitor-biz.log中 + log.warn("parallel detailnode occur cexception !!!", e); + } + exp = (BaseException)cause; + } + } catch (TimeoutException e) { + // 超时直接打warn + needThrowExp = strongRely; + if (!strongRely) { + // 非强依赖的节点我们才打印日志,因为强依赖的节点,会把异常抛出终结流程,那么异常一定会在thor层面被捕获并且打印堆栈,会造成异常重复打印 + log.warn(String.format("detailEngine execute timeout. nodeName:%s", nodeName)); + } + exp = BaseException.SYSTEM_ERROR; + + } catch (Exception e) { + if (!strongRely) { + log.error(String.format("detailEngine execute error. nodeName:%s", nodeName), e); + } + needThrowExp = strongRely; + exp = BaseException.SYSTEM_ERROR; + } + } + } + if (needThrowExp) { + throw exp; + } + } + return assembleModel(engineRunData, engineContext); + } + + /** + * 是否强依赖节点 + * + * @param nodeMap + * @param nodeKey + * @return + */ + private boolean isStrongRely(Map nodeMap, NodeName nodeKey) { + return nodeMap.get(nodeKey) != null && nodeMap.get(nodeKey).isStrongRely(); + } + + public abstract T assembleModel(EngineRunData engineRunData, EngineContext context); + + /** + * 按groupName分组, 没有group的Node放在一组,groupName相同的放到一组 + * + * @param nodeChainChain + * @return + */ + private Map> groupByGroupName(NodeChain nodeChainChain) { + Map> nodegroup = Maps.newLinkedHashMap(); + for (NodeName nodeKey : nodeChainChain.getNodeList()) { + String groupName = nodeKey.getGroupName(); + String nodeName = nodeKey.getNodeName(); + + if (StringUtils.isBlank(groupName)) { + List nodeNameList = Lists.newArrayList(); + nodeNameList.add(nodeKey); + nodegroup.put(nodeName, nodeNameList); + } else { + List nodeNameList = nodegroup.get(groupName); + if (nodeNameList == null) { + nodeNameList = Lists.newArrayList(); + } + nodeNameList.add(nodeKey); + nodegroup.put(groupName, nodeNameList); + } + } + return nodegroup; + } +} diff --git a/src/main/java/com/luna/common/engine/task/AbstractEngineNode.java b/src/main/java/com/luna/common/engine/task/AbstractEngineNode.java new file mode 100644 index 000000000..9e499a31a --- /dev/null +++ b/src/main/java/com/luna/common/engine/task/AbstractEngineNode.java @@ -0,0 +1,16 @@ +package com.luna.common.engine.task; + +import com.luna.common.engine.model.EngineContext; + +/** + * @author luna + */ +public abstract class AbstractEngineNode implements EngineNode { + @Override + public boolean couldContinueExecute(EngineContext engineContext) { + if (engineContext.isStop()) { + return false; + } + return true; + } +} diff --git a/src/main/java/com/luna/common/engine/task/EngineNode.java b/src/main/java/com/luna/common/engine/task/EngineNode.java new file mode 100644 index 000000000..6bded41c2 --- /dev/null +++ b/src/main/java/com/luna/common/engine/task/EngineNode.java @@ -0,0 +1,43 @@ +package com.luna.common.engine.task; + +import com.luna.common.engine.model.EngineContext; +import com.luna.common.engine.model.EngineRunData; + +/** + * + * 业务节点类,有一系列的EngineNode组合而成 + */ +public interface EngineNode { + + /** + * Node的执行方法 + * + * @param nodeData nodeData + * @param engineContext engineContext + */ + T invokeNode(EngineRunData nodeData, EngineContext engineContext); + + /** + * node执行完后执行的方法 + * + * @param nodeData nodeData + * @param engineContext engineContext + */ + void afterInvoke(EngineRunData nodeData, EngineContext engineContext); + + /** + * 从EngineContext中获取此node结果的key + * + * @return String + */ + String resultKey(); + + /** + * 是否可以执行,按照上下文控制 + * + * @param engineContext + * @return + */ + boolean couldContinueExecute(EngineContext engineContext); + +} diff --git a/src/main/java/com/luna/common/engine/task/NodeParallelTask.java b/src/main/java/com/luna/common/engine/task/NodeParallelTask.java new file mode 100644 index 000000000..9bd26133f --- /dev/null +++ b/src/main/java/com/luna/common/engine/task/NodeParallelTask.java @@ -0,0 +1,59 @@ +package com.luna.common.engine.task; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.luna.common.engine.model.EngineContext; +import com.luna.common.engine.model.EngineRunData; +import com.luna.common.exception.BaseException; + +/** + * Node节点的执行类 + */ +public class NodeParallelTask extends TradeEngineCallable { + + private static final Logger LOGGER = LoggerFactory.getLogger(NodeParallelTask.class); + /** + * 是否打node执行超过100ms日志 + */ + public static volatile boolean PRINT_NODE_EXECUTE_TIME_LOG = false; + /** + * 是否打印性能分析日志 + */ + public static volatile boolean PRINT_PERF_ANALYZE_LOG = false; + private EngineNode engineNode; + private EngineRunData engineRunData; + private EngineContext engineContext; + + public NodeParallelTask(EngineNode engineNode, EngineRunData engineRunData, EngineContext engineContext) { + this.engineNode = engineNode; + this.engineRunData = engineRunData; + this.engineContext = engineContext; + } + + @Override + Object _call() throws BaseException { + return execute(); + } + + public Object execute() throws BaseException { + long start = System.currentTimeMillis(); + try { + // Node前置检查 + Object o = engineNode.invokeNode(engineRunData, engineContext); + engineNode.afterInvoke(engineRunData, engineContext); + // 后置处理 + return o; + } finally { + long end = System.currentTimeMillis(); + if (PRINT_NODE_EXECUTE_TIME_LOG && (end - start) > 100) { + LOGGER.warn("fatal error, please notice. execute node:{} exceed times " + (end - start) + " millisecond ", + engineNode.getClass().getName()); + } + if (PRINT_PERF_ANALYZE_LOG) { + LOGGER.warn("==========execute node:{} total use " + (end - start) + " millisecond ===========", engineNode.getClass().getName()); + } + } + } + +} diff --git a/src/main/java/com/luna/common/engine/task/TradeEngineCallable.java b/src/main/java/com/luna/common/engine/task/TradeEngineCallable.java new file mode 100644 index 000000000..5c2eb05b7 --- /dev/null +++ b/src/main/java/com/luna/common/engine/task/TradeEngineCallable.java @@ -0,0 +1,19 @@ +package com.luna.common.engine.task; + +import java.util.concurrent.Callable; + +/** + * + * 交易engine线程执行的公共父类 + * + */ +public abstract class TradeEngineCallable implements Callable { + + @Override + public V call() throws Exception { + return _call(); + } + + abstract V _call() throws Exception; + +} diff --git a/src/main/java/com/luna/common/reflect/ReflectUtils.java b/src/main/java/com/luna/common/reflect/ReflectUtils.java index 4f7e561bb..5014c8530 100644 --- a/src/main/java/com/luna/common/reflect/ReflectUtils.java +++ b/src/main/java/com/luna/common/reflect/ReflectUtils.java @@ -7,6 +7,9 @@ import org.apache.commons.lang3.Validate; import com.luna.common.date.DateUtils; +import org.springframework.aop.framework.AdvisedSupport; +import org.springframework.aop.framework.AopProxy; +import org.springframework.aop.support.AopUtils; /** * 反射工具类. 提供调用getter/setter方法, 访问私有变量, 调用私有方法, 获取泛型类型Class, 被AOP过的真实类等工具函数. @@ -21,6 +24,54 @@ public class ReflectUtils { private static final String CGLIB_CLASS_SEPARATOR = "$$"; + /** + * 获取 目标对象 + * + * @param proxy 代理对象 + * @return + * @throws Exception + */ + public static Object getTarget(Object proxy) throws Exception { + + if (!AopUtils.isAopProxy(proxy)) { + return proxy; + // 不是代理对象 + } + + if (AopUtils.isJdkDynamicProxy(proxy)) { + return getJdkDynamicProxyTargetObject(proxy); + } else { // cglib + return getCglibProxyTargetObject(proxy); + } + + } + + private static Object getCglibProxyTargetObject(Object proxy) throws Exception { + Field h = proxy.getClass().getDeclaredField("CGLIB$CALLBACK_0"); + h.setAccessible(true); + Object dynamicAdvisedInterceptor = h.get(proxy); + + Field advised = dynamicAdvisedInterceptor.getClass().getDeclaredField("advised"); + advised.setAccessible(true); + + Object target = ((AdvisedSupport)advised.get(dynamicAdvisedInterceptor)).getTargetSource().getTarget(); + + return target; + } + + private static Object getJdkDynamicProxyTargetObject(Object proxy) throws Exception { + Field h = proxy.getClass().getSuperclass().getDeclaredField("h"); + h.setAccessible(true); + AopProxy aopProxy = (AopProxy)h.get(proxy); + + Field advised = aopProxy.getClass().getDeclaredField("advised"); + advised.setAccessible(true); + + Object target = ((AdvisedSupport)advised.get(aopProxy)).getTargetSource().getTarget(); + + return target; + } + /** * 调用Getter方法. * 支持多级,如:对象名.对象名.方法 diff --git a/src/main/java/com/luna/common/spring/SpringBeanService.java b/src/main/java/com/luna/common/spring/SpringBeanService.java new file mode 100644 index 000000000..d86e345ad --- /dev/null +++ b/src/main/java/com/luna/common/spring/SpringBeanService.java @@ -0,0 +1,95 @@ +package com.luna.common.spring; + +/** + * Created by caiyichao on 17/8/29. + */ + +import java.util.Map; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +import com.google.common.collect.Maps; +import com.luna.common.exception.BaseException; +import com.luna.common.reflect.ReflectUtils; + +import lombok.extern.slf4j.Slf4j; + +/** + * Spring的bean加载服务类 + */ +@Component +@Slf4j +public class SpringBeanService implements ApplicationContextAware { + + /** + * spring bean上下文 + */ + protected static ApplicationContext applicationContext = null; + private static Map beanPool = Maps.newConcurrentMap(); + + /** + * 获取bean实例 + */ + public static T getBeanByName(String name, Class clazz) throws BeansException { + return applicationContext.getBean(name, clazz); + } + + /** + * 获取bean实例 + */ + public static Object getBeanByName(String name) throws BeansException { + return applicationContext.getBean(name); + } + + /** + * 获取此类型所有的bean + * + * @param clazz + * @return + * @throws BeansException + */ + public static Map getBeansOfType(Class clazz) { + return applicationContext.getBeansOfType(clazz); + } + + /** + * 根据class 类型获取bean + * + * @param clazz + * @return + * @throws BeansException 当有继承或者接口时(多个实现类)getBean(clazz)会报错 + * 所以通过class name比较来获取唯一那个bean + */ + public static T getSingleBeanByType(Class clazz) throws Exception { + // 由于后续的doGetSingleBeanByType()涉及到同步,高并发场景下会导致锁竞争,此处加缓存解决 + if (beanPool.get(clazz.getName()) != null) { + return (T)beanPool.get(clazz.getName()); + } + + final T bean = doGetSingleBeanByType(clazz); + beanPool.put(clazz.getName(), bean); + return bean; + } + + private static T doGetSingleBeanByType(Class clazz) throws Exception { + String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames(); + for (String beanName : beanDefinitionNames) { + Object beanByName = getBeanByName(beanName); + Object target = ReflectUtils.getTarget(beanByName); // 防止被代理导致拿不到bean + if (clazz.getName().equals(target.getClass().getName())) { + return (T)beanByName; + } + } + log.error("can not find bean by type, class = {}", clazz.getName()); + throw BaseException.SYSTEM_ERROR; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringBeanService.applicationContext = applicationContext; + } +} diff --git a/src/main/java/com/luna/common/thread/CommonThreadPoolUtil.java b/src/main/java/com/luna/common/thread/CommonThreadPoolUtil.java index 979fb4c28..6ebfc5a2c 100644 --- a/src/main/java/com/luna/common/thread/CommonThreadPoolUtil.java +++ b/src/main/java/com/luna/common/thread/CommonThreadPoolUtil.java @@ -13,6 +13,7 @@ import com.luna.common.dto.constant.ResultCode; import lombok.Data; +import lombok.NoArgsConstructor; /** * ClassName:CommenThreadPoolUtil
@@ -20,10 +21,11 @@ * */ @Data +@NoArgsConstructor public class CommonThreadPoolUtil { - private static final long KEEP_ALIVE_TIME = 0L; - private static final Logger log = LoggerFactory.getLogger(CommonThreadPoolUtil.class); + private static final long KEEP_ALIVE_TIME = 0L; + private static final Logger log = LoggerFactory.getLogger(CommonThreadPoolUtil.class); /** 核心线程数(默认初始化为10) */ private static volatile int cacheCorePoolSize = 5; /** 核心线程控制的最大数目 */ @@ -32,18 +34,41 @@ public class CommonThreadPoolUtil { private static volatile int blockingQueueWaitSize = 2000; /** 核心线程数自动调整的增量幅度 */ private static volatile int incrementCorePoolSize = 4; + /** 初始化线程对象ThreadLocal,重写initialValue(),保证ThreadLocal首次执行get方法时不会null异常 */ + private final ThreadLocal>> threadLocal = ThreadLocal.withInitial(ArrayList::new); /** 初始化线程池 */ - private static ThreadPoolExecutor threadPool = + private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(cacheCorePoolSize, cacheCorePoolSize, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>()); - /** 初始化线程对象ThreadLocal,重写initialValue(),保证ThreadLocal首次执行get方法时不会null异常 */ - private final ThreadLocal>> threadLocal = ThreadLocal.withInitial(ArrayList::new); - public synchronized static void refresh() { + public CommonThreadPoolUtil(Integer cacheCorePoolSize) { + if (cacheCorePoolSize != null) { + setCacheCorePoolSize(cacheCorePoolSize); + } + } + + public static void main(String[] args) { + CommonThreadPoolUtil commonThreadPoolUtil = new CommonThreadPoolUtil(); + commonThreadPoolUtil.setCacheCorePoolSize(20); + for (int i = 0; i < 100; i++) { + commonThreadPoolUtil.dealTask((Callable)() -> { + System.out.println("线程池执行任务"); + return "线程池执行任务"; + }); + } + ResultDTO objectResultDTO = commonThreadPoolUtil.obtainTaskFuture(); + System.out.println(objectResultDTO); + } + + public synchronized void refresh() { threadPool = new ThreadPoolExecutor(cacheCorePoolSize, maxCorePoolSize, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>()); } + public ThreadPoolExecutor getThreadPool() { + return threadPool; + } + /** * * dealTask:(线程池执行操作-包含每个进程返回结果).
@@ -148,7 +173,7 @@ private void dynamicTuningPoolSize() { cacheCorePoolSize = currentcorePoolSize; log.warn("动态改变线程池大小====原核心线程池数目为:" + corePoolSize + ";现累加为:" + currentcorePoolSize); } else { - log.warn("动态改变线程池大小====核心线程池数目已累加为:" + cacheCorePoolSize + ";不会继续无限增加"); + log.warn("动态改变线程池大小====核心线程池数目已累加为:" + blockingQueueWaitSize + ";不会继续无限增加"); } } } @@ -175,17 +200,4 @@ public void setCacheCorePoolSize(int cacheCorePoolSize) { threadPool.setMaximumPoolSize(cacheCorePoolSize); CommonThreadPoolUtil.cacheCorePoolSize = cacheCorePoolSize; } - - public static void main(String[] args) { - CommonThreadPoolUtil commonThreadPoolUtil = new CommonThreadPoolUtil(); - commonThreadPoolUtil.setCacheCorePoolSize(20); - for (int i = 0; i < 100; i++) { - commonThreadPoolUtil.dealTask((Callable)() -> { - System.out.println("线程池执行任务"); - return "线程池执行任务"; - }); - } - ResultDTO objectResultDTO = commonThreadPoolUtil.obtainTaskFuture(); - System.out.println(objectResultDTO); - } } diff --git a/src/test/java/com/luna/common/utils/EngineNodeTest.java b/src/test/java/com/luna/common/utils/EngineNodeTest.java new file mode 100644 index 000000000..9b33d3f27 --- /dev/null +++ b/src/test/java/com/luna/common/utils/EngineNodeTest.java @@ -0,0 +1,94 @@ +package com.luna.common.utils; + +import com.luna.common.spring.SpringBeanService; +import lombok.AllArgsConstructor; +import org.junit.Test; + +import com.luna.common.engine.model.EngineContext; +import com.luna.common.engine.model.EngineRunData; +import com.luna.common.engine.model.NodeChain; +import com.luna.common.engine.model.NodeConf; +import com.luna.common.engine.task.AbstractEngineExecute; +import com.luna.common.engine.task.AbstractEngineNode; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +/** + * @author luna + * @date 2024/5/29 + */ +@Slf4j +public class EngineNodeTest { + + @Configuration + public static class TestAdaptorConfig { + @Bean + public TestEngineNode testEngineNode() { + return new TestEngineNode(); + } + + @Bean + public SpringBeanService springBeanService() { + return new SpringBeanService(); + } + } + + @Test + public void atest() throws Exception { + AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(TestAdaptorConfig.class); + annotationConfigApplicationContext.start(); + EngineRunData engineRunData = new EngineRunData(); + EngineContext engineContext = new EngineContext(); + TestEngineExecute testEngineExecute = new TestEngineExecute(); + TestAdaptor execute = testEngineExecute.execute(EngineFlow.testLink, engineRunData, engineContext); + System.out.println(execute); + } + + @Component + public static class TestEngineNode extends AbstractEngineNode { + + @Override + public Object invokeNode(EngineRunData nodeData, EngineContext engineContext) { + log.info("invokeNode::nodeData = {}, engineContext = {}", nodeData, engineContext); + return new TestAdaptor("hello"); + } + + @Override + public void afterInvoke(EngineRunData nodeData, EngineContext engineContext) { + log.info("afterInvoke::nodeData = {}, engineContext = {}", nodeData, engineContext); + } + + @Override + public String resultKey() { + return "this_is_result_key"; + } + } + + @Data + @AllArgsConstructor + public static class TestAdaptor { + private String hello; + } + + @Data + public static class EngineFlow { + private static NodeChain testLink = new NodeChain(); + + static { + testLink.add(TestEngineNode.class, new NodeConf(true, 100)); + } + } + + public class TestEngineExecute extends AbstractEngineExecute { + + @Override + public TestAdaptor assembleModel(EngineRunData engineRunData, EngineContext context) { + return (TestAdaptor)context.getAdaptorMap().get("this_is_result_key"); + } + } +} diff --git a/src/test/java/com/luna/common/utils/RestUtilsTest.java b/src/test/java/com/luna/common/utils/RestUtilsTest.java index 0d72e740c..60a0ac030 100644 --- a/src/test/java/com/luna/common/utils/RestUtilsTest.java +++ b/src/test/java/com/luna/common/utils/RestUtilsTest.java @@ -1,5 +1,6 @@ package com.luna.common.utils; +import com.luna.common.net.HttpUtils; import org.junit.Assert; import org.junit.Test; @@ -30,4 +31,9 @@ public void test2Post() { Assert.assertNotNull(result); Assert.assertEquals("{\"success\":true,\"code\":1,\"message\":\"success\",\"data\":true}", result); } + + @Test + public void atest() { + // + } } From 1689ff2b1949ad590893de825796a79e5a6b9d9e Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Wed, 29 May 2024 14:26:09 +0800 Subject: [PATCH 2/4] =?UTF-8?q?:bookmark:=20=E5=A2=9E=E5=8A=A0=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E5=BC=95=E6=93=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4cd9e684e..3765e255e 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ io.github.lunasaw luna-common luna-common - 2.5.8 + 2.5.9 common is project which contains common utils https://github.com/lunasaw/luna-common From 91782ec79adf9b8a442e424a5d04a6b089010ea8 Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Wed, 29 May 2024 14:41:20 +0800 Subject: [PATCH 3/4] =?UTF-8?q?:bookmark:=20=E5=A2=9E=E5=8A=A0=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E5=BC=95=E6=93=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/luna/common/utils/EngineNodeTest.java | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/src/test/java/com/luna/common/utils/EngineNodeTest.java b/src/test/java/com/luna/common/utils/EngineNodeTest.java index 9b33d3f27..0c44641f1 100644 --- a/src/test/java/com/luna/common/utils/EngineNodeTest.java +++ b/src/test/java/com/luna/common/utils/EngineNodeTest.java @@ -1,8 +1,10 @@ package com.luna.common.utils; -import com.luna.common.spring.SpringBeanService; -import lombok.AllArgsConstructor; import org.junit.Test; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; import com.luna.common.engine.model.EngineContext; import com.luna.common.engine.model.EngineRunData; @@ -10,13 +12,11 @@ import com.luna.common.engine.model.NodeConf; import com.luna.common.engine.task.AbstractEngineExecute; import com.luna.common.engine.task.AbstractEngineNode; +import com.luna.common.spring.SpringBeanService; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.stereotype.Component; /** * @author luna @@ -25,6 +25,19 @@ @Slf4j public class EngineNodeTest { + private static final String RESULT_KEY = "this_is_result_key"; + + @Test + public void atest() { + AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(TestAdaptorConfig.class); + annotationConfigApplicationContext.start(); + EngineRunData engineRunData = new EngineRunData(); + EngineContext engineContext = new EngineContext(); + TestEngineExecute testEngineExecute = new TestEngineExecute(); + TestAdaptor execute = testEngineExecute.execute(EngineFlow.testLink, engineRunData, engineContext); + System.out.println(execute); + } + @Configuration public static class TestAdaptorConfig { @Bean @@ -38,22 +51,11 @@ public SpringBeanService springBeanService() { } } - @Test - public void atest() throws Exception { - AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(TestAdaptorConfig.class); - annotationConfigApplicationContext.start(); - EngineRunData engineRunData = new EngineRunData(); - EngineContext engineContext = new EngineContext(); - TestEngineExecute testEngineExecute = new TestEngineExecute(); - TestAdaptor execute = testEngineExecute.execute(EngineFlow.testLink, engineRunData, engineContext); - System.out.println(execute); - } - @Component - public static class TestEngineNode extends AbstractEngineNode { + public static class TestEngineNode extends AbstractEngineNode { @Override - public Object invokeNode(EngineRunData nodeData, EngineContext engineContext) { + public TestAdaptor invokeNode(EngineRunData nodeData, EngineContext engineContext) { log.info("invokeNode::nodeData = {}, engineContext = {}", nodeData, engineContext); return new TestAdaptor("hello"); } @@ -65,7 +67,7 @@ public void afterInvoke(EngineRunData nodeData, EngineContext engineContext) { @Override public String resultKey() { - return "this_is_result_key"; + return RESULT_KEY; } } @@ -80,15 +82,18 @@ public static class EngineFlow { private static NodeChain testLink = new NodeChain(); static { - testLink.add(TestEngineNode.class, new NodeConf(true, 100)); + testLink.add("1", TestEngineNode.class, new NodeConf(true, 100)); + testLink.add("1", TestEngineNode.class, new NodeConf(true, 100)); + testLink.add("2", TestEngineNode.class, new NodeConf(true, 100)); + testLink.add("2", TestEngineNode.class, new NodeConf(false, 100)); } } - public class TestEngineExecute extends AbstractEngineExecute { + public static class TestEngineExecute extends AbstractEngineExecute { @Override public TestAdaptor assembleModel(EngineRunData engineRunData, EngineContext context) { - return (TestAdaptor)context.getAdaptorMap().get("this_is_result_key"); + return (TestAdaptor)context.getAdaptorMap().get(RESULT_KEY); } } } From d7d95a4e9e7d7d96f20a9f47979cc536eb65869d Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Wed, 29 May 2024 14:41:49 +0800 Subject: [PATCH 4/4] =?UTF-8?q?:bookmark:=20=E5=A2=9E=E5=8A=A0=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E5=BC=95=E6=93=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index b562c9d23..393245da4 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ 更新日志 +- 2024-05-29 流程引擎 - 2023-09-24 断点续传