From 95c50dcc3ab2477c91fb79366c9d960b73c2846b Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Sun, 17 Mar 2024 14:08:16 +0800 Subject: [PATCH 1/4] =?UTF-8?q?:bookmark:=202.5.7=20=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../luna/common/thread/AsyncEngineUtils.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/luna/common/thread/AsyncEngineUtils.java b/src/main/java/com/luna/common/thread/AsyncEngineUtils.java index abca9052..53697f94 100644 --- a/src/main/java/com/luna/common/thread/AsyncEngineUtils.java +++ b/src/main/java/com/luna/common/thread/AsyncEngineUtils.java @@ -1,11 +1,9 @@ package com.luna.common.thread; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import org.apache.commons.collections4.CollectionUtils; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,11 +14,17 @@ */ public class AsyncEngineUtils { - private static final Logger log = LoggerFactory.getLogger(AsyncEngineUtils.class); + private static final Logger log = LoggerFactory.getLogger(AsyncEngineUtils.class); - private static final int CORE_POOL_SIZE = 100; + private static final int CORE_POOL_SIZE = 200; - private static final int MAX_POOL_SIZE = 200; + private static final int MAX_POOL_SIZE = 200; + + private static final int KEEP_ALIVE_TIME = 60 * 5; + + private static final int QUEUE_CAPACITY = 1000; + + private static final long TIME_OUT = 300; private static final ExecutorService executor; @@ -28,9 +32,9 @@ public class AsyncEngineUtils { executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, - 60 * 5L, + KEEP_ALIVE_TIME, TimeUnit.SECONDS, - new SynchronousQueue<>(), + new LinkedBlockingDeque<>(QUEUE_CAPACITY), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); } @@ -83,11 +87,13 @@ public static List concurrentExecute(long timeout, TimeUnit unit, Callabl for (Future future : futures) { T t = null; try { - t = future.get(); + t = future.get(TIME_OUT, TimeUnit.MILLISECONDS); } catch (CancellationException e) { if (timeout > 0) { log.error("concurrentExecute some task timeout!"); } + } catch (TimeoutException tt) { + log.error("future.get() TimeoutException ", tt); } catch (Throwable tt) { log.error("future.get() Exception ", tt); } From 4543903aa80ca5854f86b80ad8029767d30000bf Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Mon, 25 Mar 2024 09:56:10 +0800 Subject: [PATCH 2/4] =?UTF-8?q?:bookmark:=202.5.7=20=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/luna/common/thread/AsyncEngineUtils.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/luna/common/thread/AsyncEngineUtils.java b/src/main/java/com/luna/common/thread/AsyncEngineUtils.java index 53697f94..3fcf7b2a 100644 --- a/src/main/java/com/luna/common/thread/AsyncEngineUtils.java +++ b/src/main/java/com/luna/common/thread/AsyncEngineUtils.java @@ -1,5 +1,6 @@ package com.luna.common.thread; +import java.util.Arrays; import java.util.List; import java.util.concurrent.*; @@ -45,11 +46,12 @@ public class AsyncEngineUtils { * @param tasks 任务 * @return T 任务返回值 */ + @SafeVarargs public static List concurrentExecute(Callable... tasks) { if (tasks == null || tasks.length == 0) { return Lists.newArrayList(); } - return concurrentExecute(-1, null, tasks); + return concurrentExecute(-1, null, Lists.newArrayList(tasks)); } /** @@ -59,12 +61,11 @@ public static List concurrentExecute(Callable... tasks) { * @return T 任务返回值 */ public static List concurrentExecute(List> tasks) { - if (CollectionUtils.isEmpty(tasks)) { return Lists.newArrayList(); } - return concurrentExecute(tasks.toArray(new Callable[tasks.size()])); + return concurrentExecute(-1, null, tasks); } /** @@ -75,15 +76,15 @@ public static List concurrentExecute(List> tasks) { * @param tasks 任务 * @return T 任务返回值 */ - public static List concurrentExecute(long timeout, TimeUnit unit, Callable... tasks) { - if (tasks == null || tasks.length == 0) { + public static List concurrentExecute(long timeout, TimeUnit unit, List> tasks) { + if (CollectionUtils.isEmpty(tasks)) { return Lists.newArrayList(); } List result = Lists.newArrayList(); try { - List> futures = timeout > 0 ? executor.invokeAll(Lists.newArrayList(tasks), timeout, unit) - : executor.invokeAll(Lists.newArrayList(tasks)); + List> futures = timeout > 0 ? executor.invokeAll(tasks, timeout, unit) + : executor.invokeAll(tasks); for (Future future : futures) { T t = null; try { From bd4d919d9faeb45f520e8b033d08b9bbb9e8cfe0 Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Sun, 7 Apr 2024 16:29:18 +0800 Subject: [PATCH 3/4] =?UTF-8?q?:bookmark:=202.5.7=20=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../luna/common/thread/AsyncEngineUtils.java | 44 ++++++++++++++++--- .../common/thread/NamedThreadFactory.java | 38 ++++++++++++++++ 3 files changed, 77 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/luna/common/thread/NamedThreadFactory.java diff --git a/pom.xml b/pom.xml index 5a8baf73..c2494854 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ io.github.lunasaw luna-common luna-common - 2.5.6 + 2.5.7 common is project which contains common utils https://github.com/lunasaw/luna-common diff --git a/src/main/java/com/luna/common/thread/AsyncEngineUtils.java b/src/main/java/com/luna/common/thread/AsyncEngineUtils.java index 3fcf7b2a..f48234f6 100644 --- a/src/main/java/com/luna/common/thread/AsyncEngineUtils.java +++ b/src/main/java/com/luna/common/thread/AsyncEngineUtils.java @@ -1,6 +1,5 @@ package com.luna.common.thread; -import java.util.Arrays; import java.util.List; import java.util.concurrent.*; @@ -27,10 +26,33 @@ public class AsyncEngineUtils { private static final long TIME_OUT = 300; - private static final ExecutorService executor; + private static final int MONITOR_PERIOD = 5; // 监控时间间隔,单位:s + + private static final ExecutorService EXECUTOR; + + private static final Runnable MONITOR_TASK = new Runnable() { + @Override + public void run() { + try { + ThreadPoolExecutor threadPool = (ThreadPoolExecutor)EXECUTOR; + int activeCount = threadPool.getActiveCount(); // 正在执行的任务数 + long completedTaskCount = threadPool.getCompletedTaskCount(); // 已完成任务数 + long totalTaskCount = threadPool.getTaskCount(); // 总任务数 + int queueSize = threadPool.getQueue().size(); + int coreSize = threadPool.getCorePoolSize(); + + log.info( + "total_task:{}, active_thread:{}, queue_size:{}, completed_thread:{}, coreSize:{}", + totalTaskCount, activeCount, queueSize, completedTaskCount, coreSize); + + } catch (Exception e) { + log.error("[SYSTEM-SafeGuard]Monitor thread run fail", e); + } + } + }; static { - executor = new ThreadPoolExecutor( + EXECUTOR = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, @@ -38,6 +60,10 @@ public class AsyncEngineUtils { new LinkedBlockingDeque<>(QUEUE_CAPACITY), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); + + ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("AsyncEngine-Monitor", true)); + monitor.scheduleAtFixedRate(MONITOR_TASK, MONITOR_PERIOD, MONITOR_PERIOD, TimeUnit.SECONDS); + } /** @@ -83,8 +109,8 @@ public static List concurrentExecute(long timeout, TimeUnit unit, List result = Lists.newArrayList(); try { - List> futures = timeout > 0 ? executor.invokeAll(tasks, timeout, unit) - : executor.invokeAll(tasks); + List> futures = timeout > 0 ? EXECUTOR.invokeAll(tasks, timeout, unit) + : EXECUTOR.invokeAll(tasks); for (Future future : futures) { T t = null; try { @@ -115,7 +141,7 @@ public static void execute(Runnable task) { if (task == null) { return; } - executor.submit(task); + EXECUTOR.submit(task); } public static void main(String[] args) { @@ -129,4 +155,10 @@ public static void main(String[] args) { List voids = concurrentExecute(list); System.out.println(voids); } + + public void destroy() { + log.warn("start to stop thread pool"); + EXECUTOR.shutdown(); + log.warn("finish to stop thread pool"); + } } \ No newline at end of file diff --git a/src/main/java/com/luna/common/thread/NamedThreadFactory.java b/src/main/java/com/luna/common/thread/NamedThreadFactory.java new file mode 100644 index 00000000..45daa784 --- /dev/null +++ b/src/main/java/com/luna/common/thread/NamedThreadFactory.java @@ -0,0 +1,38 @@ +package com.luna.common.thread; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * thread 命名 + * + * @author luna + **/ +public class NamedThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final String namePrefix; + private final boolean isDaemon; + + public NamedThreadFactory(String name) { + this(name, false); + } + + public NamedThreadFactory(String prefix, boolean daemon) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + namePrefix = prefix + "-" + poolNumber.getAndIncrement() + "-thread-"; + isDaemon = daemon; + } + + public Thread newThread(Runnable runnable) { + Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0); + // Default value is parent thread's + t.setContextClassLoader(NamedThreadFactory.class.getClassLoader()); + t.setPriority(Thread.MAX_PRIORITY); + t.setDaemon(isDaemon); + return t; + } +} \ No newline at end of file From be865cc4d7ab98fbd2f8ff624f910687cb954fa5 Mon Sep 17 00:00:00 2001 From: chenzhangyue <15696756582@163.com> Date: Sun, 7 Apr 2024 16:29:55 +0800 Subject: [PATCH 4/4] =?UTF-8?q?:bookmark:=202.5.7=20=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/luna/common/thread/AsyncEngineUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/luna/common/thread/AsyncEngineUtils.java b/src/main/java/com/luna/common/thread/AsyncEngineUtils.java index f48234f6..0c770147 100644 --- a/src/main/java/com/luna/common/thread/AsyncEngineUtils.java +++ b/src/main/java/com/luna/common/thread/AsyncEngineUtils.java @@ -156,7 +156,7 @@ public static void main(String[] args) { System.out.println(voids); } - public void destroy() { + public static void destroy() { log.warn("start to stop thread pool"); EXECUTOR.shutdown(); log.warn("finish to stop thread pool");