Skip to content

Commit

Permalink
up trace
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Jan 8, 2025
1 parent aeb81e0 commit a8e6eae
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 98 deletions.
6 changes: 3 additions & 3 deletions rxlib/src/main/java/org/rx/core/ForkJoinPoolWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public synchronized static void transform() {
};

static Runnable wrap(Runnable task) {
String traceId = ThreadPool.CTX_TRACE_ID.get();
String traceId = ThreadPool.traceId();
// log.info("wrap Runnable {}", traceId);
return () -> {
ThreadPool.startTrace(traceId);
Expand All @@ -105,7 +105,7 @@ static Runnable wrap(Runnable task) {
}

static <T> Callable<T> wrap(Callable<T> task) {
String traceId = ThreadPool.CTX_TRACE_ID.get();
String traceId = ThreadPool.traceId();
// log.info("wrap Callable {}", traceId);
return () -> {
ThreadPool.startTrace(traceId);
Expand All @@ -118,7 +118,7 @@ static <T> Callable<T> wrap(Callable<T> task) {
}

static <T> ForkJoinTask<T> wrap(ForkJoinTask<T> task) {
String traceId = ThreadPool.CTX_TRACE_ID.get();
String traceId = ThreadPool.traceId();
// log.info("wrap ForkJoinTask {}", traceId);
return ForkJoinTask.adapt(() -> {
ThreadPool.startTrace(traceId);
Expand Down
129 changes: 95 additions & 34 deletions rxlib/src/main/java/org/rx/core/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.rx.bean.FlagsEnum;
import org.rx.bean.IntWaterMark;
import org.rx.bean.RefCounter;
import org.rx.bean.ULID;
import org.rx.bean.*;
import org.rx.exception.InvalidException;
import org.rx.exception.TraceHandler;
import org.rx.util.function.Action;
import org.rx.util.function.Func;

import java.util.*;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -199,7 +199,7 @@ private Task(Callable<T> fn, FlagsEnum<RunFlag> flags, Object id) {
this.flags = flags;
this.id = id;
parent = flags.has(RunFlag.INHERIT_FAST_THREAD_LOCALS) ? InternalThreadLocalMap.getIfSet() : null;
traceId = CTX_TRACE_ID.get();
traceId = traceId();
}

@SneakyThrows
Expand Down Expand Up @@ -269,8 +269,25 @@ public FutureTaskAdapter(Runnable runnable, T result) {
//region static members
public static volatile Func<String> traceIdGenerator;
public static final Delegate<EventPublisher.StaticEventPublisher, String> onTraceIdChanged = Delegate.create();
static final ThreadLocal<Queue<String>> CTX_PARENT_TRACE_ID = new InheritableThreadLocal<>();
static final ThreadLocal<String> CTX_TRACE_ID = new InheritableThreadLocal<>();
static final ThreadLocal<LinkedList<Object>> CTX_TRACE_ID = new InheritableThreadLocal<LinkedList<Object>>() {
@Override
protected LinkedList<Object> initialValue() {
return new LinkedList<>();
}

@Override
protected LinkedList<Object> childValue(LinkedList<Object> parentValue) {
//Thread.currentThread()是parent线程
LinkedList<Object> c = new LinkedList<>();
Object peek = parentValue.peek();
if (peek != null) {
String tid = peek instanceof Tuple ? ((Tuple<String, Integer>) peek).left : (String) peek;
// log.debug("inherit {}", tid);
c.add(Tuple.of(tid, 0));
}
return c;
}
};
static final FastThreadLocal<Object> CTX_STACK_TRACE = new FastThreadLocal<>();
static final FastThreadLocal<Boolean> CONTINUE_FLAG = new FastThreadLocal<>();
private static final FastThreadLocal<Object> COMPLETION_RETURNED_VALUE = new FastThreadLocal<>();
Expand All @@ -284,7 +301,12 @@ public static String startTrace(String traceId) {

@SneakyThrows
public static String startTrace(String traceId, boolean requiresNew) {
String tid = CTX_TRACE_ID.get();
LinkedList<Object> queue = CTX_TRACE_ID.get();

Object peek = queue.peek();
Tuple<String, Integer> nestTid = null;
String tid = peek instanceof Tuple ? (nestTid = (Tuple<String, Integer>) peek).left : (String) peek;
byte f = 0;
if (tid == null) {
if (traceId != null) {
tid = traceId;
Expand All @@ -300,48 +322,87 @@ public static String startTrace(String traceId, boolean requiresNew) {
tid = ULID.randomULID().toBase64String();
}
}
CTX_TRACE_ID.set(tid);
} else if (traceId != null && !traceId.equals(tid)) {
if (!requiresNew) {
log.warn("The traceId already mapped to {} and can not set to {}", tid, traceId);
} else {
LinkedList<String> queue = (LinkedList<String>) CTX_PARENT_TRACE_ID.get();
if (queue == null) {
CTX_PARENT_TRACE_ID.set(queue = new LinkedList<>());
queue.addFirst(tid);
f = 1;
} else {
if (traceId != null && !traceId.equals(tid)) {
if (!requiresNew) {
log.warn("RTrace - The traceId already mapped to {} and can not set to {}", peek, traceId);
} else {
log.info("RTrace - Trace requires new to {} with parent {}", traceId, peek);
if (queue.size() > RxConfig.INSTANCE.threadPool.maxTraceDepth) {
log.warn("RTrace - Discard traceId {}", traceId);
} else {
queue.addFirst(tid = traceId);
}
f = 3;
}
} else {
if (queue.size() > RxConfig.INSTANCE.threadPool.maxTraceDepth) {
log.warn("RTrace - Discard traceId {}", peek);
} else {
queue.poll();
if (nestTid == null) {
nestTid = Tuple.of(tid, 1);
}
nestTid.right++;
queue.addFirst(nestTid);
}
queue.addFirst(tid);
CTX_TRACE_ID.set(traceId);
log.info("trace requires new to {} with parent {}", traceId, tid);
tid = traceId;
f = 2;
}
}
// log.info("trace start {}", tid);

onTraceIdChanged.invoke(EventPublisher.STATIC_QUIETLY_EVENT_INSTANCE, tid);
if (log.isDebugEnabled()) {
switch (f) {
case 1:
log.debug("RTrace - start new {}", queue);
break;
case 2:
log.debug("RTrace - start nest {}", queue);
break;
case 3:
log.debug("RTrace - start requires new {}", queue);
break;
}
}
return tid;
}

public static String traceId() {
return CTX_TRACE_ID.get();
Object peek = CTX_TRACE_ID.get().peek();
return peek instanceof Tuple ? ((Tuple<String, Integer>) peek).left : (String) peek;
}

@SneakyThrows
public static void endTrace() {
// log.info("trace end");
Queue<String> queue = CTX_PARENT_TRACE_ID.get();
String parentTid;
if (queue != null && (parentTid = queue.poll()) != null) {
CTX_TRACE_ID.set(parentTid);
if (queue.isEmpty()) {
CTX_PARENT_TRACE_ID.remove();
LinkedList<Object> queue = CTX_TRACE_ID.get();
if (queue.isEmpty()) {
log.warn("RTrace - not started");
return;
}

boolean next = false;
Object peek = queue.peek();
Tuple<String, Integer> nestTid = null;
String tid = peek instanceof Tuple ? (nestTid = (Tuple<String, Integer>) peek).left : (String) peek;
if (nestTid == null || --nestTid.right <= 0) {
queue.poll();
next = true;
}
log.debug("RTrace - end {} -> {}", queue, peek);

while (next) {
peek = queue.peek();
nestTid = null;
tid = peek instanceof Tuple ? (nestTid = (Tuple<String, Integer>) peek).left : (String) peek;
if (nestTid != null && nestTid.right == 0) {
queue.poll();
} else {
next = false;
}
} else {
parentTid = null;
CTX_TRACE_ID.remove();
}
onTraceIdChanged.invoke(EventPublisher.STATIC_QUIETLY_EVENT_INSTANCE, parentTid);
onTraceIdChanged.invoke(EventPublisher.STATIC_QUIETLY_EVENT_INSTANCE, tid);
}

public static <T> T completionReturnedValue() {
Expand Down
2 changes: 1 addition & 1 deletion rxlib/src/main/java/org/rx/core/WheelTimer.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Task<T> implements TimerTask, TimeoutFuture<T> {
this.flags = flags;
this.id = id;
this.nextDelayFn = nextDelayFn;
traceId = ThreadPool.CTX_TRACE_ID.get();
traceId = ThreadPool.traceId();
}

@SneakyThrows
Expand Down
120 changes: 61 additions & 59 deletions rxlib/src/test/java/org/rx/core/TestCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.rx.bean.$.$;
import static org.rx.core.Extends.*;
Expand Down Expand Up @@ -197,41 +196,43 @@ public void inheritThreadLocal() {
ThreadPool pool = new ThreadPool(3, 1, new IntWaterMark(20, 40), "DEV");

//当线程池无空闲线程时,任务放置队列后,当队列任务执行时会带上正确的traceId
// ThreadPool.startTrace(null);
// for (int i = 0; i < 2; i++) {
// int finalI = i;
// pool.run(() -> {
// log.info("TRACE DELAY-1 {}", finalI);
// pool.run(() -> {
// log.info("TRACE DELAY-1_1 {}", finalI);
// sleep(oneSecond);
// });
// sleep(oneSecond);
// });
// log.info("TRACE DELAY MAIN {}", finalI);
// pool.run(() -> {
// log.info("TRACE DELAY-2 {}", finalI);
// sleep(oneSecond);
// });
// }
// ThreadPool.endTrace();
// sleep(8000);
//
// //WheelTimer(ScheduledExecutorService) 异步trace
// WheelTimer timer = Tasks.timer();
// ThreadPool.startTrace(null);
// for (int i = 0; i < 2; i++) {
// int finalI = i;
// timer.setTimeout(() -> {
// log.info("TRACE TIMER {}", finalI);
// sleep(oneSecond);
// }, oneSecond);
// log.info("TRACE TIMER MAIN {}", finalI);
// }
// ThreadPool.endTrace();
// sleep(4000);
ThreadPool.startTrace(null);
for (int i = 0; i < 2; i++) {
int finalI = i;
pool.run(() -> {
log.info("TRACE DELAY-1 {}", finalI);
pool.run(() -> {
log.info("TRACE DELAY-1_1 {}", finalI);
sleep(oneSecond);
});
sleep(oneSecond);
});
log.info("TRACE DELAY MAIN {}", finalI);
pool.run(() -> {
log.info("TRACE DELAY-2 {}", finalI);
sleep(oneSecond);
});
}
ThreadPool.endTrace();
sleep(5000);
System.out.println("---next---");

//CompletableFuture.xxAsync异步方法正确获取trace
//WheelTimer(ScheduledExecutorService) 异步trace
WheelTimer timer = Tasks.timer();
ThreadPool.startTrace(null);
for (int i = 0; i < 2; i++) {
int finalI = i;
timer.setTimeout(() -> {
log.info("TRACE TIMER {}", finalI);
sleep(oneSecond);
}, oneSecond);
log.info("TRACE TIMER MAIN {}", finalI);
}
ThreadPool.endTrace();
sleep(5000);
System.out.println("---next---");
//
// //CompletableFuture.xxAsync异步方法正确获取trace
// ThreadPool.startTrace(null);
// for (int i = 0; i < 2; i++) {
// int finalI = i;
Expand Down Expand Up @@ -269,30 +270,31 @@ public void inheritThreadLocal() {
// log.info("TRACE ALL_OF end");
// ThreadPool.endTrace();
// sleep(5000);

//parallelStream
ThreadPool.startTrace(null);
Arrays.toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).parallelStream().map(p -> {
//todo
Arrays.toList("a", "b", "c").parallelStream().map(x -> {
log.info("parallelStream {} -> {}", p, x);
return x.toString();
}).collect(Collectors.toList());
log.info("parallelStream {}", p);
return p.toString();
}).collect(Collectors.toList());
ThreadPool.endTrace();

//timer
ThreadPool.startTrace(null);
Tasks.timer().setTimeout(() -> {
log.info("TIMER 1");
pool.run(() -> {
log.info("TIMER 2");
});
}, d -> d > 5000 ? -1 : Math.max(d * 2, 1000), null, TimeoutFlag.PERIOD.flags());
ThreadPool.endTrace();
sleep(8000);
// System.out.println("---next---");
//
// //parallelStream
// ThreadPool.startTrace(null);
// Arrays.toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).parallelStream().map(p -> {
// //todo
// Arrays.toList("a", "b", "c").parallelStream().map(x -> {
// log.info("parallelStream {} -> {}", p, x);
// return x.toString();
// }).collect(Collectors.toList());
// log.info("parallelStream {}", p);
// return p.toString();
// }).collect(Collectors.toList());
// ThreadPool.endTrace();
//
// //timer
// ThreadPool.startTrace(null);
// Tasks.timer().setTimeout(() -> {
// log.info("TIMER 1");
// pool.run(() -> {
// log.info("TIMER 2");
// });
// }, d -> d > 5000 ? -1 : Math.max(d * 2, 1000), null, TimeoutFlag.PERIOD.flags());
// ThreadPool.endTrace();
// sleep(8000);
//
// //netty FastThreadLocal 支持继承
// FastThreadLocal<Integer> ftl = new FastThreadLocal<>();
Expand Down
3 changes: 2 additions & 1 deletion rxlib/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@

<logger name="io.netty" level="INFO"/>
<logger name="org.apache.sshd" level="INFO"/>
<logger name="org.rx.core.ThreadPool" level="INFO"/>
<!-- <logger name="org.rx.core.ThreadPool" level="INFO"/>-->
<logger name="org.rx.core.CpuWatchman" level="INFO"/>
<logger name="org.rx.io.EntityDatabaseImpl" level="INFO"/>
<!-- <logger name="org.rx.net" level="DEBUG"/>-->
<!-- <logger name="org.rx.net.socks.Socks5UdpRelayHandler" level="DEBUG"/>-->
Expand Down

0 comments on commit a8e6eae

Please sign in to comment.