文章
问答
冒泡
解决链路传递中(jaeger)多线程问题

上文我们讲了spring mvc工程中的链路传递。
从jeager的实现来看其实很多地方还是基于threadlocal来实现的。我们都知道threadlocal传递到子线程是需要设置的,并且好像效果并不好。而且如果是在调用线程池,不管是jdk的线程池还是spring的线程池都解决不了这个问题。

operntracing提供了多线程的包来支持

<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-concurrent</artifactId>
    <version>0.4.0</version>
</dependency>


主要包含了

  • TracedCallable
  • TracedExecutor
  • TracedExecutorService
  • TracedRunnable
  • TracedScheduledExecutorService

其实就是包装了一层 ,里面把tracer的传递加上去了。

new Thread(new TracedRunnable(() -> {
//todo
}, GlobalTracer.get()));


如果单从jaeger的角度考虑,以上的部分已经可以解决多线程的使用问题了。但是,我们业务中一般掺杂了自己的特殊场景,所以这里还是需要做一些改造的。
首先,原来默认的 ThreadLocalScopeManager 中使用的是ThreadLocal 。

public class ThreadLocalScopeManager implements ScopeManager {
    final ThreadLocal<ThreadLocalScope> tlsScope = new ThreadLocal<ThreadLocalScope>();

    @Override
    public Scope activate(Span span) {
        return new ThreadLocalScope(this, span);
    }

    @Override
    public Span activeSpan() {
        ThreadLocalScope scope = tlsScope.get();
        return scope == null ? null : scope.span();
    }
}


这里,我们先替换成阿里的 transmittable-thread-local

public class TllThreadLocalScope implements Scope {
    private final TllThreadLocalScopeManager scopeManager;
    private final Span wrapped;
    private final TllThreadLocalScope toRestore;

    public TllThreadLocalScope(TllThreadLocalScopeManager scopeManager, Span wrapped) {
        this.scopeManager = scopeManager;
        this.wrapped = wrapped;
        this.toRestore = scopeManager.tlsScope.get();
        scopeManager.tlsScope.set(this);
    }

    @Override
    public void close() {
        if (scopeManager.tlsScope.get() != this) {
            // This shouldn't happen if users call methods in the expected order. Bail out.
            return;
        }

        scopeManager.tlsScope.set(toRestore);
    }

    Span span() {
        return wrapped;
    }
}
public class TllThreadLocalScopeManager implements ScopeManager {
    final TransmittableThreadLocal<TllThreadLocalScope> tlsScope = new TransmittableThreadLocal<TllThreadLocalScope>();

    @Override
    public Scope activate(Span span) {
        return new TllThreadLocalScope(this, span);
    }

    @Override
    public Span activeSpan() {
        TllThreadLocalScope scope = tlsScope.get();
        return scope == null ? null : scope.span();
    }
}


将我们自定义的的MDCScopeManager中的wrappedScopeManager 的值替换掉

public static class Builder {
    private ScopeManager scopeManager = new TllThreadLocalScopeManager();
    private String mdcTraceIdKey = "traceId";
    private String mdcSpanIdKey = "spanId";
    private String mdcSampledKey = "sampled";
    private String mdcTenantIdKey = ActorConstants.X_TENANT_ID;
    private String mdcActorIdKey = ActorConstants.X_ACTOR_ID;

    public Builder withScopeManager(ScopeManager scopeManager) {
        this.scopeManager = scopeManager;
        return this;
    }

    public Builder withMDCTraceIdKey(String mdcTraceIdKey) {
        this.mdcTraceIdKey = mdcTraceIdKey;
        return this;
    }

    public Builder withMDCSpanIdKey(String mdcSpanIdKey) {
        this.mdcSpanIdKey = mdcSpanIdKey;
        return this;
    }

    public Builder withMDCSampledKey(String mdcSampledKey) {
        this.mdcSampledKey = mdcSampledKey;
        return this;
    }

    public Builder withMDCTenantIdKey(String mdcTenantIdKey) {
        this.mdcTenantIdKey = mdcTenantIdKey;
        return this;
    }

    public Builder withMDCActorIdKey(String mdcActorIdKey) {
        this.mdcActorIdKey = mdcActorIdKey;
        return this;
    }

    public MSMDCScopeManager build() {
        return new MSMDCScopeManager(this);
    }

}


然后参考 opentracing-concurrent 的写法,自己实现一套,以支持不引入jaeger的情况
以TracedRunnable为例

public class MSTracedRunnable implements Runnable {
    private final Runnable delegate;
    private final Tracer tracer;
    private final Span span;
    private final Actor actor;

    public MSTracedRunnable(Runnable delegate) {
        this(delegate, null, null, null);
    }

    public MSTracedRunnable(Runnable delegate, Tracer tracer, Span span) {
        this(delegate, tracer, span, null);
    }

    public MSTracedRunnable(Runnable delegate, Span span) {
        this(delegate, null, span, null);
    }

    public MSTracedRunnable(Runnable delegate, Actor actor) {
        this(delegate, null, null, actor);
    }

    public MSTracedRunnable(Runnable delegate, Tracer tracer, Span span, Actor actor) {
        this.delegate = delegate;
        this.tracer = Objects.nonNull(tracer) ? tracer : GlobalTracer.get();
        this.span = Objects.nonNull(span) ? span : Objects.nonNull(this.tracer) ? this.tracer.scopeManager().activeSpan() : null;
        this.actor = Objects.nonNull(actor) ? actor : ActorContextHolder.getActor();
    }

    @Override
    public void run() {
        if (ActorContextHolder.hasTracer()) {
            Scope scope = Objects.isNull(span) ? null : tracer.scopeManager().activate(span);
            try {
                delegate.run();
            } finally {
                if (Objects.nonNull(scope)) {
                    scope.close();
                }
            }
        } else {
            ActorContextHolder.setActor(actor);
            delegate.run();
        }

    }
}

 

创建一个子线程看看效果

log.info("thread");
new Thread(new MSTracedRunnable(()->{
    log.info("thread");
})).start();

 

[2021-11-29 16:17:21.931] [xxx] [local] [c373968170c8d3bc] [1111] [actor-id]   INFO 2958 --- [  XNIO-1 task-1] c.b.s.u.r.p.c.impl.EmailUserController   : thread
[2021-11-29 16:17:21.934] [xxx] [local] [c373968170c8d3bc] [1111] [actor-id]   INFO 2958 --- [       Thread-4] c.b.s.u.r.p.c.impl.EmailUserController   : thread


线程池写的时候要用我们自己写的方法包装下
这里图方便用静态方法,一般都要自己去new一个线程池,静态创建的都是无边界的队列,容易出问题

@Bean(value = "tracedExecutorService")
public ExecutorService executorService(){
    ExecutorService executor = Executors.newFixedThreadPool(4); 
    return new MSTracedExecutorService(executor);
}
executorService.submit(()->{
    log.info("thread");
});

 

[2021-11-29 17:23:07.118] [xxx] [local] [c3570eae9d4ab49c] [1111] [actor-id]   INFO 3573 --- [  XNIO-1 task-1] c.b.s.u.r.p.c.impl.EmailUserController   : thread
[2021-11-29 17:23:07.122] [xxx] [local] [c3570eae9d4ab49c] [1111] [actor-id]   INFO 3573 --- [pool-2-thread-1] c.b.s.u.r.p.c.impl.EmailUserController   : thread

 

@Bean(value = "tracedExecutor")
public Executor executor(){
    ExecutorService executor = Executors.newFixedThreadPool(4);
    return new MSTracedExecutor(executor);
}

 

log.info("thread");
executor.execute(()->{
    log.info("thread");
});

 

[2021-11-29 17:15:26.993] [xxx] [local] [a3b048f08779ceab] [1111] [actor-id]   INFO 3504 --- [  XNIO-1 task-1] c.b.s.u.r.p.c.impl.EmailUserController   : thread
[2021-11-29 17:15:26.998] [xxx] [local] [a3b048f08779ceab] [1111] [actor-id]   INFO 3504 --- [pool-3-thread-1] c.b.s.u.r.p.c.impl.EmailUserController   : thread


总结而言,解决链路多线程的问题 ,一个是创建子线程的时候 把threadlocal内容传递,一个是线程池,在execute执行处,进行数据传递的包装。所以不可避免的,需要规定团队在使用多线程的时候, 要按照指定方式处理。


关于作者

落雁沙
非典型码农
获得点赞
文章被阅读