上文我们讲了spring mvc工程中的链路传递。
从jeager的实现来看其实很多地方还是基于threadlocal来实现的。我们都知道threadlocal传递到子线程是需要设置的,并且好像效果并不好。而且如果是在调用线程池,不管是jdk的线程池还是spring的线程池都解决不了这个问题。
operntracing提供了多线程的包来支持
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-concurrent</artifactId>
<version>0.4.0</version>
</dependency>
主要包含了
- TracedCallable
- TracedExecutor
- TracedExecutorService
- TracedRunnable
- TracedScheduledExecutorService
其实就是包装了一层 ,里面把tracer的传递加上去了。
new Thread(new TracedRunnable(() -> {
//todo
}, GlobalTracer.get()));
如果单从jaeger的角度考虑,以上的部分已经可以解决多线程的使用问题了。但是,我们业务中一般掺杂了自己的特殊场景,所以这里还是需要做一些改造的。
首先,原来默认的 ThreadLocalScopeManager 中使用的是ThreadLocal 。
public class ThreadLocalScopeManager implements ScopeManager {
final ThreadLocal<ThreadLocalScope> tlsScope = new ThreadLocal<ThreadLocalScope>();
@Override
public Scope activate(Span span) {
return new ThreadLocalScope(this, span);
}
@Override
public Span activeSpan() {
ThreadLocalScope scope = tlsScope.get();
return scope == null ? null : scope.span();
}
}
这里,我们先替换成阿里的 transmittable-thread-local
public class TllThreadLocalScope implements Scope {
private final TllThreadLocalScopeManager scopeManager;
private final Span wrapped;
private final TllThreadLocalScope toRestore;
public TllThreadLocalScope(TllThreadLocalScopeManager scopeManager, Span wrapped) {
this.scopeManager = scopeManager;
this.wrapped = wrapped;
this.toRestore = scopeManager.tlsScope.get();
scopeManager.tlsScope.set(this);
}
@Override
public void close() {
if (scopeManager.tlsScope.get() != this) {
// This shouldn't happen if users call methods in the expected order. Bail out.
return;
}
scopeManager.tlsScope.set(toRestore);
}
Span span() {
return wrapped;
}
}
public class TllThreadLocalScopeManager implements ScopeManager {
final TransmittableThreadLocal<TllThreadLocalScope> tlsScope = new TransmittableThreadLocal<TllThreadLocalScope>();
@Override
public Scope activate(Span span) {
return new TllThreadLocalScope(this, span);
}
@Override
public Span activeSpan() {
TllThreadLocalScope scope = tlsScope.get();
return scope == null ? null : scope.span();
}
}
将我们自定义的的MDCScopeManager中的wrappedScopeManager 的值替换掉
public static class Builder {
private ScopeManager scopeManager = new TllThreadLocalScopeManager();
private String mdcTraceIdKey = "traceId";
private String mdcSpanIdKey = "spanId";
private String mdcSampledKey = "sampled";
private String mdcTenantIdKey = ActorConstants.X_TENANT_ID;
private String mdcActorIdKey = ActorConstants.X_ACTOR_ID;
public Builder withScopeManager(ScopeManager scopeManager) {
this.scopeManager = scopeManager;
return this;
}
public Builder withMDCTraceIdKey(String mdcTraceIdKey) {
this.mdcTraceIdKey = mdcTraceIdKey;
return this;
}
public Builder withMDCSpanIdKey(String mdcSpanIdKey) {
this.mdcSpanIdKey = mdcSpanIdKey;
return this;
}
public Builder withMDCSampledKey(String mdcSampledKey) {
this.mdcSampledKey = mdcSampledKey;
return this;
}
public Builder withMDCTenantIdKey(String mdcTenantIdKey) {
this.mdcTenantIdKey = mdcTenantIdKey;
return this;
}
public Builder withMDCActorIdKey(String mdcActorIdKey) {
this.mdcActorIdKey = mdcActorIdKey;
return this;
}
public MSMDCScopeManager build() {
return new MSMDCScopeManager(this);
}
}
然后参考 opentracing-concurrent 的写法,自己实现一套,以支持不引入jaeger的情况
以TracedRunnable为例
public class MSTracedRunnable implements Runnable {
private final Runnable delegate;
private final Tracer tracer;
private final Span span;
private final Actor actor;
public MSTracedRunnable(Runnable delegate) {
this(delegate, null, null, null);
}
public MSTracedRunnable(Runnable delegate, Tracer tracer, Span span) {
this(delegate, tracer, span, null);
}
public MSTracedRunnable(Runnable delegate, Span span) {
this(delegate, null, span, null);
}
public MSTracedRunnable(Runnable delegate, Actor actor) {
this(delegate, null, null, actor);
}
public MSTracedRunnable(Runnable delegate, Tracer tracer, Span span, Actor actor) {
this.delegate = delegate;
this.tracer = Objects.nonNull(tracer) ? tracer : GlobalTracer.get();
this.span = Objects.nonNull(span) ? span : Objects.nonNull(this.tracer) ? this.tracer.scopeManager().activeSpan() : null;
this.actor = Objects.nonNull(actor) ? actor : ActorContextHolder.getActor();
}
@Override
public void run() {
if (ActorContextHolder.hasTracer()) {
Scope scope = Objects.isNull(span) ? null : tracer.scopeManager().activate(span);
try {
delegate.run();
} finally {
if (Objects.nonNull(scope)) {
scope.close();
}
}
} else {
ActorContextHolder.setActor(actor);
delegate.run();
}
}
}
创建一个子线程看看效果
log.info("thread");
new Thread(new MSTracedRunnable(()->{
log.info("thread");
})).start();
[2021-11-29 16:17:21.931] [xxx] [local] [c373968170c8d3bc] [1111] [actor-id] INFO 2958 --- [ XNIO-1 task-1] c.b.s.u.r.p.c.impl.EmailUserController : thread
[2021-11-29 16:17:21.934] [xxx] [local] [c373968170c8d3bc] [1111] [actor-id] INFO 2958 --- [ Thread-4] c.b.s.u.r.p.c.impl.EmailUserController : thread
线程池写的时候要用我们自己写的方法包装下
这里图方便用静态方法,一般都要自己去new一个线程池,静态创建的都是无边界的队列,容易出问题
@Bean(value = "tracedExecutorService")
public ExecutorService executorService(){
ExecutorService executor = Executors.newFixedThreadPool(4);
return new MSTracedExecutorService(executor);
}
executorService.submit(()->{
log.info("thread");
});
[2021-11-29 17:23:07.118] [xxx] [local] [c3570eae9d4ab49c] [1111] [actor-id] INFO 3573 --- [ XNIO-1 task-1] c.b.s.u.r.p.c.impl.EmailUserController : thread
[2021-11-29 17:23:07.122] [xxx] [local] [c3570eae9d4ab49c] [1111] [actor-id] INFO 3573 --- [pool-2-thread-1] c.b.s.u.r.p.c.impl.EmailUserController : thread
@Bean(value = "tracedExecutor")
public Executor executor(){
ExecutorService executor = Executors.newFixedThreadPool(4);
return new MSTracedExecutor(executor);
}
log.info("thread");
executor.execute(()->{
log.info("thread");
});
[2021-11-29 17:15:26.993] [xxx] [local] [a3b048f08779ceab] [1111] [actor-id] INFO 3504 --- [ XNIO-1 task-1] c.b.s.u.r.p.c.impl.EmailUserController : thread
[2021-11-29 17:15:26.998] [xxx] [local] [a3b048f08779ceab] [1111] [actor-id] INFO 3504 --- [pool-3-thread-1] c.b.s.u.r.p.c.impl.EmailUserController : thread
总结而言,解决链路多线程的问题 ,一个是创建子线程的时候 把threadlocal内容传递,一个是线程池,在execute执行处,进行数据传递的包装。所以不可避免的,需要规定团队在使用多线程的时候, 要按照指定方式处理。