From a8e6eaed3f789ca0e1e93d62bf491faa7062ae92 Mon Sep 17 00:00:00 2001 From: wxm <115806199+youfanx@users.noreply.github.com> Date: Wed, 8 Jan 2025 21:55:37 +0800 Subject: [PATCH] up trace --- .../java/org/rx/core/ForkJoinPoolWrapper.java | 6 +- .../src/main/java/org/rx/core/ThreadPool.java | 129 +++++++++++++----- .../src/main/java/org/rx/core/WheelTimer.java | 2 +- rxlib/src/test/java/org/rx/core/TestCore.java | 120 ++++++++-------- rxlib/src/test/resources/logback.xml | 3 +- 5 files changed, 162 insertions(+), 98 deletions(-) diff --git a/rxlib/src/main/java/org/rx/core/ForkJoinPoolWrapper.java b/rxlib/src/main/java/org/rx/core/ForkJoinPoolWrapper.java index a20482b2..3fee311f 100644 --- a/rxlib/src/main/java/org/rx/core/ForkJoinPoolWrapper.java +++ b/rxlib/src/main/java/org/rx/core/ForkJoinPoolWrapper.java @@ -92,7 +92,7 @@ public synchronized static void transform() { }; static Runnable wrap(Runnable task) { - String traceId = ThreadPool.CTX_TRACE_ID.get(); + String traceId = ThreadPool.traceId(); // log.info("wrap Runnable {}", traceId); return () -> { ThreadPool.startTrace(traceId); @@ -105,7 +105,7 @@ static Runnable wrap(Runnable task) { } static Callable wrap(Callable task) { - String traceId = ThreadPool.CTX_TRACE_ID.get(); + String traceId = ThreadPool.traceId(); // log.info("wrap Callable {}", traceId); return () -> { ThreadPool.startTrace(traceId); @@ -118,7 +118,7 @@ static Callable wrap(Callable task) { } static ForkJoinTask wrap(ForkJoinTask task) { - String traceId = ThreadPool.CTX_TRACE_ID.get(); + String traceId = ThreadPool.traceId(); // log.info("wrap ForkJoinTask {}", traceId); return ForkJoinTask.adapt(() -> { ThreadPool.startTrace(traceId); diff --git a/rxlib/src/main/java/org/rx/core/ThreadPool.java b/rxlib/src/main/java/org/rx/core/ThreadPool.java index 4d0122e8..f024ac35 100644 --- a/rxlib/src/main/java/org/rx/core/ThreadPool.java +++ b/rxlib/src/main/java/org/rx/core/ThreadPool.java @@ -10,16 +10,16 @@ import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.rx.bean.FlagsEnum; -import org.rx.bean.IntWaterMark; -import org.rx.bean.RefCounter; -import org.rx.bean.ULID; +import org.rx.bean.*; import org.rx.exception.InvalidException; import org.rx.exception.TraceHandler; import org.rx.util.function.Action; import org.rx.util.function.Func; -import java.util.*; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -199,7 +199,7 @@ private Task(Callable fn, FlagsEnum flags, Object id) { this.flags = flags; this.id = id; parent = flags.has(RunFlag.INHERIT_FAST_THREAD_LOCALS) ? InternalThreadLocalMap.getIfSet() : null; - traceId = CTX_TRACE_ID.get(); + traceId = traceId(); } @SneakyThrows @@ -269,8 +269,25 @@ public FutureTaskAdapter(Runnable runnable, T result) { //region static members public static volatile Func traceIdGenerator; public static final Delegate onTraceIdChanged = Delegate.create(); - static final ThreadLocal> CTX_PARENT_TRACE_ID = new InheritableThreadLocal<>(); - static final ThreadLocal CTX_TRACE_ID = new InheritableThreadLocal<>(); + static final ThreadLocal> CTX_TRACE_ID = new InheritableThreadLocal>() { + @Override + protected LinkedList initialValue() { + return new LinkedList<>(); + } + + @Override + protected LinkedList childValue(LinkedList parentValue) { + //Thread.currentThread()是parent线程 + LinkedList c = new LinkedList<>(); + Object peek = parentValue.peek(); + if (peek != null) { + String tid = peek instanceof Tuple ? ((Tuple) peek).left : (String) peek; +// log.debug("inherit {}", tid); + c.add(Tuple.of(tid, 0)); + } + return c; + } + }; static final FastThreadLocal CTX_STACK_TRACE = new FastThreadLocal<>(); static final FastThreadLocal CONTINUE_FLAG = new FastThreadLocal<>(); private static final FastThreadLocal COMPLETION_RETURNED_VALUE = new FastThreadLocal<>(); @@ -284,7 +301,12 @@ public static String startTrace(String traceId) { @SneakyThrows public static String startTrace(String traceId, boolean requiresNew) { - String tid = CTX_TRACE_ID.get(); + LinkedList queue = CTX_TRACE_ID.get(); + + Object peek = queue.peek(); + Tuple nestTid = null; + String tid = peek instanceof Tuple ? (nestTid = (Tuple) peek).left : (String) peek; + byte f = 0; if (tid == null) { if (traceId != null) { tid = traceId; @@ -300,48 +322,87 @@ public static String startTrace(String traceId, boolean requiresNew) { tid = ULID.randomULID().toBase64String(); } } - CTX_TRACE_ID.set(tid); - } else if (traceId != null && !traceId.equals(tid)) { - if (!requiresNew) { - log.warn("The traceId already mapped to {} and can not set to {}", tid, traceId); - } else { - LinkedList queue = (LinkedList) CTX_PARENT_TRACE_ID.get(); - if (queue == null) { - CTX_PARENT_TRACE_ID.set(queue = new LinkedList<>()); + queue.addFirst(tid); + f = 1; + } else { + if (traceId != null && !traceId.equals(tid)) { + if (!requiresNew) { + log.warn("RTrace - The traceId already mapped to {} and can not set to {}", peek, traceId); + } else { + log.info("RTrace - Trace requires new to {} with parent {}", traceId, peek); + if (queue.size() > RxConfig.INSTANCE.threadPool.maxTraceDepth) { + log.warn("RTrace - Discard traceId {}", traceId); + } else { + queue.addFirst(tid = traceId); + } + f = 3; } + } else { if (queue.size() > RxConfig.INSTANCE.threadPool.maxTraceDepth) { + log.warn("RTrace - Discard traceId {}", peek); + } else { queue.poll(); + if (nestTid == null) { + nestTid = Tuple.of(tid, 1); + } + nestTid.right++; + queue.addFirst(nestTid); } - queue.addFirst(tid); - CTX_TRACE_ID.set(traceId); - log.info("trace requires new to {} with parent {}", traceId, tid); - tid = traceId; + f = 2; } } -// log.info("trace start {}", tid); + onTraceIdChanged.invoke(EventPublisher.STATIC_QUIETLY_EVENT_INSTANCE, tid); + if (log.isDebugEnabled()) { + switch (f) { + case 1: + log.debug("RTrace - start new {}", queue); + break; + case 2: + log.debug("RTrace - start nest {}", queue); + break; + case 3: + log.debug("RTrace - start requires new {}", queue); + break; + } + } return tid; } public static String traceId() { - return CTX_TRACE_ID.get(); + Object peek = CTX_TRACE_ID.get().peek(); + return peek instanceof Tuple ? ((Tuple) peek).left : (String) peek; } @SneakyThrows public static void endTrace() { -// log.info("trace end"); - Queue queue = CTX_PARENT_TRACE_ID.get(); - String parentTid; - if (queue != null && (parentTid = queue.poll()) != null) { - CTX_TRACE_ID.set(parentTid); - if (queue.isEmpty()) { - CTX_PARENT_TRACE_ID.remove(); + LinkedList queue = CTX_TRACE_ID.get(); + if (queue.isEmpty()) { + log.warn("RTrace - not started"); + return; + } + + boolean next = false; + Object peek = queue.peek(); + Tuple nestTid = null; + String tid = peek instanceof Tuple ? (nestTid = (Tuple) peek).left : (String) peek; + if (nestTid == null || --nestTid.right <= 0) { + queue.poll(); + next = true; + } + log.debug("RTrace - end {} -> {}", queue, peek); + + while (next) { + peek = queue.peek(); + nestTid = null; + tid = peek instanceof Tuple ? (nestTid = (Tuple) peek).left : (String) peek; + if (nestTid != null && nestTid.right == 0) { + queue.poll(); + } else { + next = false; } - } else { - parentTid = null; - CTX_TRACE_ID.remove(); } - onTraceIdChanged.invoke(EventPublisher.STATIC_QUIETLY_EVENT_INSTANCE, parentTid); + onTraceIdChanged.invoke(EventPublisher.STATIC_QUIETLY_EVENT_INSTANCE, tid); } public static T completionReturnedValue() { diff --git a/rxlib/src/main/java/org/rx/core/WheelTimer.java b/rxlib/src/main/java/org/rx/core/WheelTimer.java index eeed9774..bb3f1645 100644 --- a/rxlib/src/main/java/org/rx/core/WheelTimer.java +++ b/rxlib/src/main/java/org/rx/core/WheelTimer.java @@ -69,7 +69,7 @@ class Task implements TimerTask, TimeoutFuture { this.flags = flags; this.id = id; this.nextDelayFn = nextDelayFn; - traceId = ThreadPool.CTX_TRACE_ID.get(); + traceId = ThreadPool.traceId(); } @SneakyThrows diff --git a/rxlib/src/test/java/org/rx/core/TestCore.java b/rxlib/src/test/java/org/rx/core/TestCore.java index b3b18f57..4dcfb859 100644 --- a/rxlib/src/test/java/org/rx/core/TestCore.java +++ b/rxlib/src/test/java/org/rx/core/TestCore.java @@ -39,7 +39,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import static org.rx.bean.$.$; import static org.rx.core.Extends.*; @@ -197,41 +196,43 @@ public void inheritThreadLocal() { ThreadPool pool = new ThreadPool(3, 1, new IntWaterMark(20, 40), "DEV"); //当线程池无空闲线程时,任务放置队列后,当队列任务执行时会带上正确的traceId -// ThreadPool.startTrace(null); -// for (int i = 0; i < 2; i++) { -// int finalI = i; -// pool.run(() -> { -// log.info("TRACE DELAY-1 {}", finalI); -// pool.run(() -> { -// log.info("TRACE DELAY-1_1 {}", finalI); -// sleep(oneSecond); -// }); -// sleep(oneSecond); -// }); -// log.info("TRACE DELAY MAIN {}", finalI); -// pool.run(() -> { -// log.info("TRACE DELAY-2 {}", finalI); -// sleep(oneSecond); -// }); -// } -// ThreadPool.endTrace(); -// sleep(8000); -// -// //WheelTimer(ScheduledExecutorService) 异步trace -// WheelTimer timer = Tasks.timer(); -// ThreadPool.startTrace(null); -// for (int i = 0; i < 2; i++) { -// int finalI = i; -// timer.setTimeout(() -> { -// log.info("TRACE TIMER {}", finalI); -// sleep(oneSecond); -// }, oneSecond); -// log.info("TRACE TIMER MAIN {}", finalI); -// } -// ThreadPool.endTrace(); -// sleep(4000); + ThreadPool.startTrace(null); + for (int i = 0; i < 2; i++) { + int finalI = i; + pool.run(() -> { + log.info("TRACE DELAY-1 {}", finalI); + pool.run(() -> { + log.info("TRACE DELAY-1_1 {}", finalI); + sleep(oneSecond); + }); + sleep(oneSecond); + }); + log.info("TRACE DELAY MAIN {}", finalI); + pool.run(() -> { + log.info("TRACE DELAY-2 {}", finalI); + sleep(oneSecond); + }); + } + ThreadPool.endTrace(); + sleep(5000); + System.out.println("---next---"); - //CompletableFuture.xxAsync异步方法正确获取trace + //WheelTimer(ScheduledExecutorService) 异步trace + WheelTimer timer = Tasks.timer(); + ThreadPool.startTrace(null); + for (int i = 0; i < 2; i++) { + int finalI = i; + timer.setTimeout(() -> { + log.info("TRACE TIMER {}", finalI); + sleep(oneSecond); + }, oneSecond); + log.info("TRACE TIMER MAIN {}", finalI); + } + ThreadPool.endTrace(); + sleep(5000); + System.out.println("---next---"); +// +// //CompletableFuture.xxAsync异步方法正确获取trace // ThreadPool.startTrace(null); // for (int i = 0; i < 2; i++) { // int finalI = i; @@ -269,30 +270,31 @@ public void inheritThreadLocal() { // log.info("TRACE ALL_OF end"); // ThreadPool.endTrace(); // sleep(5000); - - //parallelStream - ThreadPool.startTrace(null); - Arrays.toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).parallelStream().map(p -> { - //todo - Arrays.toList("a", "b", "c").parallelStream().map(x -> { - log.info("parallelStream {} -> {}", p, x); - return x.toString(); - }).collect(Collectors.toList()); - log.info("parallelStream {}", p); - return p.toString(); - }).collect(Collectors.toList()); - ThreadPool.endTrace(); - - //timer - ThreadPool.startTrace(null); - Tasks.timer().setTimeout(() -> { - log.info("TIMER 1"); - pool.run(() -> { - log.info("TIMER 2"); - }); - }, d -> d > 5000 ? -1 : Math.max(d * 2, 1000), null, TimeoutFlag.PERIOD.flags()); - ThreadPool.endTrace(); - sleep(8000); +// System.out.println("---next---"); +// +// //parallelStream +// ThreadPool.startTrace(null); +// Arrays.toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).parallelStream().map(p -> { +// //todo +// Arrays.toList("a", "b", "c").parallelStream().map(x -> { +// log.info("parallelStream {} -> {}", p, x); +// return x.toString(); +// }).collect(Collectors.toList()); +// log.info("parallelStream {}", p); +// return p.toString(); +// }).collect(Collectors.toList()); +// ThreadPool.endTrace(); +// +// //timer +// ThreadPool.startTrace(null); +// Tasks.timer().setTimeout(() -> { +// log.info("TIMER 1"); +// pool.run(() -> { +// log.info("TIMER 2"); +// }); +// }, d -> d > 5000 ? -1 : Math.max(d * 2, 1000), null, TimeoutFlag.PERIOD.flags()); +// ThreadPool.endTrace(); +// sleep(8000); // // //netty FastThreadLocal 支持继承 // FastThreadLocal ftl = new FastThreadLocal<>(); diff --git a/rxlib/src/test/resources/logback.xml b/rxlib/src/test/resources/logback.xml index d7175bd2..87d30536 100644 --- a/rxlib/src/test/resources/logback.xml +++ b/rxlib/src/test/resources/logback.xml @@ -33,7 +33,8 @@ - + +