spring mvc工程下设计上下文链路的贯穿与传递

前言
在项目中,我们会需要将执行上下文贯穿整个请求链路。那么如果我们实现这个需求,要考虑几个点。
1.如果请求跨服务,怎么在服务之间传递?
2.异步业务中,如果实现链路上下文的贯穿?
3.如果是简单单体应用,要不要考虑轻量方案?

单体应用中如何实现链路传递
在只考虑spring mvc的情况下,我们基本只需要通过ThreadLocal就可以实现,这个需求。但是,如果业务中有需要使用线程池的地方,就需要自己实现一个线程池的实现类了,或者在execute中自己去处理ThreadLocal的赋值和清空。因为,ThreadLocal可以传递给子线程,但是,线程池是复用线程,不是当前线程的子线程,这时候,需要自己执行线程进行赋值。

在多服务之间进行链路传递
多服务之间的链路传递,最麻烦的就是无法确定服务之间的调用方式。除非去把每个使用到的每种调用方式都去做实现,但是这么做的话,成本就比较高了。考虑到以上因素,我们选择jaeger来作为链路传递的实现方式。

可以看到,基本上主流的调用传递方式都已经有。

如何引入jaeger,这个在之前的文章中已经介绍过。https://mp.weixin.qq.com/s/t2SWMubMUirOObRYhtg5Jg

我们先写一个context的包
引入依赖

<dependency>
    <groupId>io.opentracing</groupId>
    <artifactId>opentracing-util</artifactId>
    <version>0.33.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.12.2</version>
</dependency>

创建一个Actor对象,来存放上下文的内容。

@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class Actor {
    private String role;
    private String actorId;
    private String tenantId;
    private Instant time;

    @Override
    public String toString() {
        return "Actor{" +
                "role='" + role + '\'' +
                ", actorId='" + actorId + '\'' +
                ", tenantId='" + tenantId + '\'' +
                ", time=" + time +
                '}';
    }
}


定义一个常量类

public class ActorConstants {
    public static final String X_TRACE_ID = "X-MS-TraceId";
    public static final String X_ACTOR_ID = "X-MS-ActorId";
    public static final String X_TENANT_ID = "X-MS-TenantId";
    public static final String X_Time = "X-MS-Time";

    public static final String MDC_TRACE_ID = "traceId";
    public static final String MDC_ACTOR_ID = "actorId";
    public static final String MDC_TENANT_ID = "tenantId";
}


一个具体的处理类

public class ActorContextHolder {
    private static final Logger logger = LoggerFactory.getLogger(ActorContextHolder.class);
    private ActorContextHolder() {
    }

    private static final  TransmittableThreadLocal<Actor> actorHolder = new TransmittableThreadLocal<>();
    private static final  TransmittableThreadLocal<LinkedBlockingDeque<Actor>> actorQueue = new TransmittableThreadLocal<>();

    public static void setActor(Actor actor) {
        if(hasTracer()){
            setTracerActor(actor);
        }else {
            setLocalActor(actor);
        }
    }

    public static Actor getActor() {
        if(hasTracer()){
            return getTracerActor();
        }else {
            return getLocalActor();
        }
    }

    public static void resetActor() {
        if(hasTracer()){
            resetTracerActor();
        }else {
            resetLocalActor();
        }
    }

    public static void addActorsFirst(Actor actor){
        if(Objects.isNull(actor)){
            return;
        }
        LinkedBlockingDeque<Actor> actors = actorQueue.get();
        if(Objects.isNull(actors)){
            actors = new LinkedBlockingDeque<>();
        }
        Actor previousActor = getActor();
        actors.addFirst(previousActor);
        actorQueue.set(actors);
        setActor(actor);
    }

    public static void removeActorsFirst(){
        LinkedBlockingDeque<Actor> actors = actorQueue.get();
        if(Objects.isNull(actors)){
            setActor(null);
            return ;
        }
        try{
            Actor previousActor = actors.removeFirst();
            actorQueue.set(actors);
            setActor(previousActor);
        }catch (NoSuchElementException ex){
            if(logger.isDebugEnabled()){
                logger.info("[ActorContextHolder] context actors is empty ");
            }
        }
    }

    public static void runAs(Actor actor,Runnable runnable){
        addActorsFirst(actor);
        runnable.run();
        removeActorsFirst();
    }

    public static <T> T runAs(Actor actor, Supplier<T> supplier){
        addActorsFirst(actor);
        T result = supplier.get();
        removeActorsFirst();
        return result;
    }

    private static boolean hasTracer(){
        return !(GlobalTracer.get().activeSpan() instanceof NoopSpan);
    }

    private static Actor getLocalActor(){
        return actorHolder.get();
    }

    private static Actor getTracerActor(){
        Span activeSpan = GlobalTracer.get().scopeManager().activeSpan();
        if(Objects.isNull(activeSpan)){
            return null;
        }
        Actor actor = Actor.builder().build();
        actor.setActorId(activeSpan.getBaggageItem(X_ACTOR_ID));
        actor.setTenantId(activeSpan.getBaggageItem(X_TENANT_ID));
        return actor;
    }

    private static void setLocalActor(Actor actor){
        actorHolder.set(actor);
        putLocalMDCContext(actor);
    }

    private static void setTracerActor(Actor actor){
        ScopeManager scopeManager = GlobalTracer.get().scopeManager();
        Span parentSpan = scopeManager.activeSpan();
        Tracer.SpanBuilder spanBuilder = GlobalTracer.get().buildSpan("set-actor");
        Span actorSpan ;
        if(Objects.nonNull(parentSpan)){
            actorSpan = spanBuilder.asChildOf(parentSpan).start();
        }else {
            actorSpan = spanBuilder.start();
        }
        actorSpan.setBaggageItem(X_ACTOR_ID, actor.getActorId());
        actorSpan.setBaggageItem(X_TENANT_ID, actor.getTenantId());
        scopeManager.activate(actorSpan);
        actorSpan.finish();
    }

    private static void resetLocalActor(){
        actorHolder.remove();
        cleanLocalMDCContext();
    }

    private static void resetTracerActor(){
        Span activeSpan = GlobalTracer.get().activeSpan();
        if(Objects.isNull(activeSpan)){
            return;
        }
        activeSpan.setBaggageItem(X_ACTOR_ID,null);
        activeSpan.setBaggageItem(X_TENANT_ID,null);
    }

    private static void putLocalMDCContext(Actor actor){
        if(Objects.nonNull(actor)){
            MDC.put(MDC_ACTOR_ID,actor.getActorId());
            MDC.put(MDC_TENANT_ID,actor.getTenantId());
        }
    }

    private static void cleanLocalMDCContext(){
        MDC.put(MDC_ACTOR_ID,null);
        MDC.put(MDC_TENANT_ID,null);
    }

}

这里我们是在操作的时候,通过当前的Span是不是NoopSpan,来判断当前有没有引入具体的opentracing实现。如果没有就基于ThreadLocal,如果有,就基于jaeger实现。

到这里,基本的链路传递就已经完成了。但是还是会有几个问题。
1.我们只需要用来传递上下文,并不想采样。
虽然,我们设置了采样率为0,也设置了不收集日志,但是依旧会发送udp数据,会报如下的waring。

[2021-11-15 15:42:37.196] [ithere-backend] [dev] [trade-id]  WARN 1 --- [-QueueProcessor] i.j.internal.reporters.RemoteReporter    : FlushCommand execution failed! Repeated errors of this command will not be logged.

io.jaegertracing.internal.exceptions.SenderException: Failed to flush spans.
        at io.jaegertracing.thrift.internal.senders.ThriftSender.flush(ThriftSender.java:115)
        at io.jaegertracing.internal.reporters.RemoteReporter$FlushCommand.execute(RemoteReporter.java:160)
        at io.jaegertracing.internal.reporters.RemoteReporter$QueueProcessor.run(RemoteReporter.java:182)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.jaegertracing.internal.exceptions.SenderException: Could not send 2 spans
        at io.jaegertracing.thrift.internal.senders.UdpSender.send(UdpSender.java:85)
        at io.jaegertracing.thrift.internal.senders.ThriftSender.flush(ThriftSender.java:113)
        ... 3 common frames omitted
Caused by: org.apache.thrift.transport.TTransportException: Cannot flush closed transport
        at io.jaegertracing.thrift.internal.reporters.protocols.ThriftUdpTransport.flush(ThriftUdpTransport.java:148)
        at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:73)
        at org.apache.thrift.TServiceClient.sendBaseOneway(TServiceClient.java:66)
        at io.jaegertracing.agent.thrift.Agent$Client.send_emitBatch(Agent.java:70)
        at io.jaegertracing.agent.thrift.Agent$Client.emitBatch(Agent.java:63)
        at io.jaegertracing.thrift.internal.senders.UdpSender.send(UdpSender.java:83)
        ... 4 common frames omitted
Caused by: java.net.PortUnreachableException: ICMP Port Unreachable
        at java.base/java.net.PlainDatagramSocketImpl.send(Native Method)
        at java.base/java.net.DatagramSocket.send(DatagramSocket.java:695)
        at io.jaegertracing.thrift.internal.reporters.protocols.ThriftUdpTransport.flush(ThriftUdpTransport.java:146)
        ... 9 common frames omitted


解决方案
这是由于在JaegerAutoConfiguration中,默认是配置了Reporter的

@ConditionalOnMissingBean
@Bean
public Reporter reporter(JaegerConfigurationProperties properties,
                         Metrics metrics,
                         @Autowired(required = false) ReporterAppender reporterAppender) {

  List<Reporter> reporters = new LinkedList<>();
  RemoteReporter remoteReporter = properties.getRemoteReporter();

  JaegerConfigurationProperties.HttpSender httpSender = properties.getHttpSender();
  if (!StringUtils.isEmpty(httpSender.getUrl())) {
    reporters.add(getHttpReporter(metrics, remoteReporter, httpSender));
  } else {
    reporters.add(getUdpReporter(metrics, remoteReporter, properties.getUdpSender()));
  }

  if (properties.isLogSpans()) {
    reporters.add(new LoggingReporter());
  }

  if (reporterAppender != null) {
    reporterAppender.append(reporters);
  }

  return new CompositeReporter(reporters.toArray(new Reporter[reporters.size()]));
}


这样的话,不管怎样都是会去执行发送数据,我们只需要把这个覆盖掉即可。

@ConfigurationProperties("opentracing.jaeger")
public class PropagateJaegerConfProperties {
    private Reporter reporter;

    public Reporter getReporter() {
        return reporter;
    }

    public void setReporter(Reporter reporter) {
        this.reporter = reporter;
    }

    public static class Reporter{
        private boolean enabled;

        public boolean isEnabled() {
            return enabled;
        }

        public void setEnabled(boolean enabled) {
            this.enabled = enabled;
        }
    }
}
@Configuration(value = "com.moensun.spring.boot.propagate.propagateJaegerConfiguration")
@ConditionalOnClass(io.jaegertracing.internal.JaegerTracer.class)
@ConditionalOnMissingBean(io.opentracing.Tracer.class)
@ConditionalOnProperty(value = "opentracing.jaeger.reporter.enabled", havingValue = "false", matchIfMissing = true)
@AutoConfigureBefore(JaegerAutoConfiguration.class)
@EnableConfigurationProperties(PropagateJaegerConfProperties.class)
public class PropagateJaegerConfiguration {

    @ConditionalOnMissingBean
    @Bean
    public Reporter reporter(@Autowired(required = false) ReporterAppender reporterAppender) {

        List<Reporter> reporters = new LinkedList<>();
        if (reporterAppender != null) {
            reporterAppender.append(reporters);
        }

        return new CompositeReporter(reporters.toArray(new Reporter[reporters.size()]));
    }

}


只需要配置下就不会去发送数据了。

opentracing:
  jaeger:
    const-sampler:
      decision: false
    probabilistic-sampler:
      sampling-rate: 0
    log-spans: false
    reporter:
      enabled: false


2.官方提供的MDCScopeManager,只记录了opentracing的相关数据,我们的业务数据没有。
自己实现一个MDCScopeManager

public class MSMDCScopeManager implements ScopeManager {
    private final ScopeManager wrappedScopeManager;
    private final String mdcTraceIdKey;
    private final String mdcSpanIdKey;
    private final String mdcSampledKey;
    private final String mdcTenantIdKey;
    private final String mdcActorIdKey;

    private MSMDCScopeManager(Builder builder) {
        this.wrappedScopeManager = builder.scopeManager;
        this.mdcTraceIdKey = builder.mdcTraceIdKey;
        this.mdcSpanIdKey = builder.mdcSpanIdKey;
        this.mdcSampledKey = builder.mdcSampledKey;
        this.mdcTenantIdKey = builder.mdcTenantIdKey;
        this.mdcActorIdKey = builder.mdcActorIdKey;
    }

    @Override
    public Scope activate(Span span) {
        return new MDCScope(wrappedScopeManager.activate(span), span);
    }

    @Override
    public Span activeSpan() {
        return wrappedScopeManager.activeSpan();
    }

    public static class Builder {
        private ScopeManager scopeManager = new ThreadLocalScopeManager();
        private String mdcTraceIdKey = "traceId";
        private String mdcSpanIdKey = "spanId";
        private String mdcSampledKey = "sampled";
        private String mdcTenantIdKey = ActorConstants.X_TENANT_ID;
        private String mdcActorIdKey = ActorConstants.X_ACTOR_ID;

        public Builder withScopeManager(ScopeManager scopeManager) {
            this.scopeManager = scopeManager;
            return this;
        }

        public Builder withMDCTraceIdKey(String mdcTraceIdKey) {
            this.mdcTraceIdKey = mdcTraceIdKey;
            return this;
        }

        public Builder withMDCSpanIdKey(String mdcSpanIdKey) {
            this.mdcSpanIdKey = mdcSpanIdKey;
            return this;
        }

        public Builder withMDCSampledKey(String mdcSampledKey) {
            this.mdcSampledKey = mdcSampledKey;
            return this;
        }

        public Builder withMDCTenantIdKey(String mdcTenantIdKey) {
            this.mdcTenantIdKey = mdcTenantIdKey;
            return this;
        }

        public Builder withMDCActorIdKey(String mdcActorIdKey) {
            this.mdcActorIdKey = mdcActorIdKey;
            return this;
        }

        public MSMDCScopeManager build() {
            return new MSMDCScopeManager(this);
        }

    }

    private class MDCScope implements Scope {
        private final Scope wrappedScope;
        private final String previousTraceId;
        private final String previousSpanId;
        private final String previousSampled;
        private final String previousTenantId;
        private final String previousActorId;

        /**
         * mdcScope.
         */
        MDCScope(Scope scope, Span span) {
            this.wrappedScope = scope;
            this.previousTraceId = MDC.get(mdcTraceIdKey);
            this.previousSpanId = MDC.get(mdcSpanIdKey);
            this.previousSampled = MDC.get(mdcSampledKey);
            this.previousTenantId = MDC.get(mdcTenantIdKey);
            this.previousActorId = MDC.get(mdcActorIdKey);

            if (span.context() instanceof JaegerSpanContext) {
                putContext((JaegerSpanContext) span.context());
            }
        }

        protected void putContext(JaegerSpanContext spanContext) {
            replace(mdcTraceIdKey, spanContext.toTraceId());
            replace(mdcSpanIdKey, spanContext.toSpanId());
            replace(mdcSampledKey, String.valueOf(spanContext.isSampled()));
            replace(mdcTenantIdKey, spanContext.getBaggageItem(ActorConstants.MDC_TENANT_ID));
            replace(mdcActorIdKey, spanContext.getBaggageItem(ActorConstants.MDC_ACTOR_ID));
        }

        private void replace(String key, String value) {
            if (value == null) {
                MDC.remove(key);
            } else {
                MDC.put(key, value);
            }
        }

        @Override
        public void close() {
            wrappedScope.close();
            replace(mdcTraceIdKey, previousTraceId);
            replace(mdcSpanIdKey, previousSpanId);
            replace(mdcSampledKey, previousSampled);
            replace(mdcTenantIdKey, previousTenantId);
            replace(mdcActorIdKey, previousActorId);
        }
    }
}

 

@Configuration
public class PropagateAutoConfiguration {
    @Bean
    public TracerBuilderCustomizer tracerBuilderCustomizer() {
        return builder -> builder.withScopeManager(new MSMDCScopeManager.Builder().build());
    }
}


修改下日志的配置文件

<property name="CONSOLE_LOGGER_PATTERN" value="[%clr(%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}}){faint}] [${applicationName}] [${environment}] [%X{traceId:-trade-id}] [%X{companyId:-company-id}] [%X{operatorId:-operator-id}]  %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>


这样在打印的日志中就可以带上我们的context信息。

至此,完整的链路传递就可以实现了。下次写链路传递中多线程的解决方案。

解决链路传递中(jaeger)多线程问题


关于作者

落雁沙
非典型码农
获得点赞
文章被阅读