diff --git a/pom.xml b/pom.xml index 5a8baf73..c2494854 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ io.github.lunasaw luna-common luna-common - 2.5.6 + 2.5.7 common is project which contains common utils https://github.com/lunasaw/luna-common diff --git a/src/main/java/com/luna/common/thread/AsyncEngineUtils.java b/src/main/java/com/luna/common/thread/AsyncEngineUtils.java index abca9052..0c770147 100644 --- a/src/main/java/com/luna/common/thread/AsyncEngineUtils.java +++ b/src/main/java/com/luna/common/thread/AsyncEngineUtils.java @@ -1,11 +1,9 @@ package com.luna.common.thread; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import org.apache.commons.collections4.CollectionUtils; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,23 +14,56 @@ */ public class AsyncEngineUtils { - private static final Logger log = LoggerFactory.getLogger(AsyncEngineUtils.class); + private static final Logger log = LoggerFactory.getLogger(AsyncEngineUtils.class); - private static final int CORE_POOL_SIZE = 100; + private static final int CORE_POOL_SIZE = 200; - private static final int MAX_POOL_SIZE = 200; + private static final int MAX_POOL_SIZE = 200; - private static final ExecutorService executor; + private static final int KEEP_ALIVE_TIME = 60 * 5; + + private static final int QUEUE_CAPACITY = 1000; + + private static final long TIME_OUT = 300; + + private static final int MONITOR_PERIOD = 5; // 监控时间间隔,单位:s + + private static final ExecutorService EXECUTOR; + + private static final Runnable MONITOR_TASK = new Runnable() { + @Override + public void run() { + try { + ThreadPoolExecutor threadPool = (ThreadPoolExecutor)EXECUTOR; + int activeCount = threadPool.getActiveCount(); // 正在执行的任务数 + long completedTaskCount = threadPool.getCompletedTaskCount(); // 已完成任务数 + long totalTaskCount = threadPool.getTaskCount(); // 总任务数 + int queueSize = threadPool.getQueue().size(); + int coreSize = threadPool.getCorePoolSize(); + + log.info( + "total_task:{}, active_thread:{}, queue_size:{}, completed_thread:{}, coreSize:{}", + totalTaskCount, activeCount, queueSize, completedTaskCount, coreSize); + + } catch (Exception e) { + log.error("[SYSTEM-SafeGuard]Monitor thread run fail", e); + } + } + }; static { - executor = new ThreadPoolExecutor( + EXECUTOR = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, - 60 * 5L, + KEEP_ALIVE_TIME, TimeUnit.SECONDS, - new SynchronousQueue<>(), + new LinkedBlockingDeque<>(QUEUE_CAPACITY), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); + + ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("AsyncEngine-Monitor", true)); + monitor.scheduleAtFixedRate(MONITOR_TASK, MONITOR_PERIOD, MONITOR_PERIOD, TimeUnit.SECONDS); + } /** @@ -41,11 +72,12 @@ public class AsyncEngineUtils { * @param tasks 任务 * @return T 任务返回值 */ + @SafeVarargs public static List concurrentExecute(Callable... tasks) { if (tasks == null || tasks.length == 0) { return Lists.newArrayList(); } - return concurrentExecute(-1, null, tasks); + return concurrentExecute(-1, null, Lists.newArrayList(tasks)); } /** @@ -55,12 +87,11 @@ public static List concurrentExecute(Callable... tasks) { * @return T 任务返回值 */ public static List concurrentExecute(List> tasks) { - if (CollectionUtils.isEmpty(tasks)) { return Lists.newArrayList(); } - return concurrentExecute(tasks.toArray(new Callable[tasks.size()])); + return concurrentExecute(-1, null, tasks); } /** @@ -71,23 +102,25 @@ public static List concurrentExecute(List> tasks) { * @param tasks 任务 * @return T 任务返回值 */ - public static List concurrentExecute(long timeout, TimeUnit unit, Callable... tasks) { - if (tasks == null || tasks.length == 0) { + public static List concurrentExecute(long timeout, TimeUnit unit, List> tasks) { + if (CollectionUtils.isEmpty(tasks)) { return Lists.newArrayList(); } List result = Lists.newArrayList(); try { - List> futures = timeout > 0 ? executor.invokeAll(Lists.newArrayList(tasks), timeout, unit) - : executor.invokeAll(Lists.newArrayList(tasks)); + List> futures = timeout > 0 ? EXECUTOR.invokeAll(tasks, timeout, unit) + : EXECUTOR.invokeAll(tasks); for (Future future : futures) { T t = null; try { - t = future.get(); + t = future.get(TIME_OUT, TimeUnit.MILLISECONDS); } catch (CancellationException e) { if (timeout > 0) { log.error("concurrentExecute some task timeout!"); } + } catch (TimeoutException tt) { + log.error("future.get() TimeoutException ", tt); } catch (Throwable tt) { log.error("future.get() Exception ", tt); } @@ -108,7 +141,7 @@ public static void execute(Runnable task) { if (task == null) { return; } - executor.submit(task); + EXECUTOR.submit(task); } public static void main(String[] args) { @@ -122,4 +155,10 @@ public static void main(String[] args) { List voids = concurrentExecute(list); System.out.println(voids); } + + public static void destroy() { + log.warn("start to stop thread pool"); + EXECUTOR.shutdown(); + log.warn("finish to stop thread pool"); + } } \ No newline at end of file diff --git a/src/main/java/com/luna/common/thread/NamedThreadFactory.java b/src/main/java/com/luna/common/thread/NamedThreadFactory.java new file mode 100644 index 00000000..45daa784 --- /dev/null +++ b/src/main/java/com/luna/common/thread/NamedThreadFactory.java @@ -0,0 +1,38 @@ +package com.luna.common.thread; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * thread 命名 + * + * @author luna + **/ +public class NamedThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final String namePrefix; + private final boolean isDaemon; + + public NamedThreadFactory(String name) { + this(name, false); + } + + public NamedThreadFactory(String prefix, boolean daemon) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + namePrefix = prefix + "-" + poolNumber.getAndIncrement() + "-thread-"; + isDaemon = daemon; + } + + public Thread newThread(Runnable runnable) { + Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0); + // Default value is parent thread's + t.setContextClassLoader(NamedThreadFactory.class.getClassLoader()); + t.setPriority(Thread.MAX_PRIORITY); + t.setDaemon(isDaemon); + return t; + } +} \ No newline at end of file