前言
在项目中,我们会需要将执行上下文贯穿整个请求链路。那么如果我们实现这个需求,要考虑几个点。
1.如果请求跨服务,怎么在服务之间传递?
2.异步业务中,如果实现链路上下文的贯穿?
3.如果是简单单体应用,要不要考虑轻量方案?
单体应用中如何实现链路传递
在只考虑spring mvc的情况下,我们基本只需要通过ThreadLocal就可以实现,这个需求。但是,如果业务中有需要使用线程池的地方,就需要自己实现一个线程池的实现类了,或者在execute中自己去处理ThreadLocal的赋值和清空。因为,ThreadLocal可以传递给子线程,但是,线程池是复用线程,不是当前线程的子线程,这时候,需要自己执行线程进行赋值。
在多服务之间进行链路传递
多服务之间的链路传递,最麻烦的就是无法确定服务之间的调用方式。除非去把每个使用到的每种调用方式都去做实现,但是这么做的话,成本就比较高了。考虑到以上因素,我们选择jaeger来作为链路传递的实现方式。
可以看到,基本上主流的调用传递方式都已经有。
如何引入jaeger,这个在之前的文章中已经介绍过。https://mp.weixin.qq.com/s/t2SWMubMUirOObRYhtg5Jg
我们先写一个context的包
引入依赖
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-util</artifactId>
<version>0.33.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.12.2</version>
</dependency>
创建一个Actor对象,来存放上下文的内容。
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class Actor {
private String role;
private String actorId;
private String tenantId;
private Instant time;
@Override
public String toString() {
return "Actor{" +
"role='" + role + '\'' +
", actorId='" + actorId + '\'' +
", tenantId='" + tenantId + '\'' +
", time=" + time +
'}';
}
}
定义一个常量类
public class ActorConstants {
public static final String X_TRACE_ID = "X-MS-TraceId";
public static final String X_ACTOR_ID = "X-MS-ActorId";
public static final String X_TENANT_ID = "X-MS-TenantId";
public static final String X_Time = "X-MS-Time";
public static final String MDC_TRACE_ID = "traceId";
public static final String MDC_ACTOR_ID = "actorId";
public static final String MDC_TENANT_ID = "tenantId";
}
一个具体的处理类
public class ActorContextHolder {
private static final Logger logger = LoggerFactory.getLogger(ActorContextHolder.class);
private ActorContextHolder() {
}
private static final TransmittableThreadLocal<Actor> actorHolder = new TransmittableThreadLocal<>();
private static final TransmittableThreadLocal<LinkedBlockingDeque<Actor>> actorQueue = new TransmittableThreadLocal<>();
public static void setActor(Actor actor) {
if(hasTracer()){
setTracerActor(actor);
}else {
setLocalActor(actor);
}
}
public static Actor getActor() {
if(hasTracer()){
return getTracerActor();
}else {
return getLocalActor();
}
}
public static void resetActor() {
if(hasTracer()){
resetTracerActor();
}else {
resetLocalActor();
}
}
public static void addActorsFirst(Actor actor){
if(Objects.isNull(actor)){
return;
}
LinkedBlockingDeque<Actor> actors = actorQueue.get();
if(Objects.isNull(actors)){
actors = new LinkedBlockingDeque<>();
}
Actor previousActor = getActor();
actors.addFirst(previousActor);
actorQueue.set(actors);
setActor(actor);
}
public static void removeActorsFirst(){
LinkedBlockingDeque<Actor> actors = actorQueue.get();
if(Objects.isNull(actors)){
setActor(null);
return ;
}
try{
Actor previousActor = actors.removeFirst();
actorQueue.set(actors);
setActor(previousActor);
}catch (NoSuchElementException ex){
if(logger.isDebugEnabled()){
logger.info("[ActorContextHolder] context actors is empty ");
}
}
}
public static void runAs(Actor actor,Runnable runnable){
addActorsFirst(actor);
runnable.run();
removeActorsFirst();
}
public static <T> T runAs(Actor actor, Supplier<T> supplier){
addActorsFirst(actor);
T result = supplier.get();
removeActorsFirst();
return result;
}
private static boolean hasTracer(){
return !(GlobalTracer.get().activeSpan() instanceof NoopSpan);
}
private static Actor getLocalActor(){
return actorHolder.get();
}
private static Actor getTracerActor(){
Span activeSpan = GlobalTracer.get().scopeManager().activeSpan();
if(Objects.isNull(activeSpan)){
return null;
}
Actor actor = Actor.builder().build();
actor.setActorId(activeSpan.getBaggageItem(X_ACTOR_ID));
actor.setTenantId(activeSpan.getBaggageItem(X_TENANT_ID));
return actor;
}
private static void setLocalActor(Actor actor){
actorHolder.set(actor);
putLocalMDCContext(actor);
}
private static void setTracerActor(Actor actor){
ScopeManager scopeManager = GlobalTracer.get().scopeManager();
Span parentSpan = scopeManager.activeSpan();
Tracer.SpanBuilder spanBuilder = GlobalTracer.get().buildSpan("set-actor");
Span actorSpan ;
if(Objects.nonNull(parentSpan)){
actorSpan = spanBuilder.asChildOf(parentSpan).start();
}else {
actorSpan = spanBuilder.start();
}
actorSpan.setBaggageItem(X_ACTOR_ID, actor.getActorId());
actorSpan.setBaggageItem(X_TENANT_ID, actor.getTenantId());
scopeManager.activate(actorSpan);
actorSpan.finish();
}
private static void resetLocalActor(){
actorHolder.remove();
cleanLocalMDCContext();
}
private static void resetTracerActor(){
Span activeSpan = GlobalTracer.get().activeSpan();
if(Objects.isNull(activeSpan)){
return;
}
activeSpan.setBaggageItem(X_ACTOR_ID,null);
activeSpan.setBaggageItem(X_TENANT_ID,null);
}
private static void putLocalMDCContext(Actor actor){
if(Objects.nonNull(actor)){
MDC.put(MDC_ACTOR_ID,actor.getActorId());
MDC.put(MDC_TENANT_ID,actor.getTenantId());
}
}
private static void cleanLocalMDCContext(){
MDC.put(MDC_ACTOR_ID,null);
MDC.put(MDC_TENANT_ID,null);
}
}
这里我们是在操作的时候,通过当前的Span是不是NoopSpan,来判断当前有没有引入具体的opentracing实现。如果没有就基于ThreadLocal,如果有,就基于jaeger实现。
到这里,基本的链路传递就已经完成了。但是还是会有几个问题。
1.我们只需要用来传递上下文,并不想采样。
虽然,我们设置了采样率为0,也设置了不收集日志,但是依旧会发送udp数据,会报如下的waring。
[2021-11-15 15:42:37.196] [ithere-backend] [dev] [trade-id] WARN 1 --- [-QueueProcessor] i.j.internal.reporters.RemoteReporter : FlushCommand execution failed! Repeated errors of this command will not be logged.
io.jaegertracing.internal.exceptions.SenderException: Failed to flush spans.
at io.jaegertracing.thrift.internal.senders.ThriftSender.flush(ThriftSender.java:115)
at io.jaegertracing.internal.reporters.RemoteReporter$FlushCommand.execute(RemoteReporter.java:160)
at io.jaegertracing.internal.reporters.RemoteReporter$QueueProcessor.run(RemoteReporter.java:182)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.jaegertracing.internal.exceptions.SenderException: Could not send 2 spans
at io.jaegertracing.thrift.internal.senders.UdpSender.send(UdpSender.java:85)
at io.jaegertracing.thrift.internal.senders.ThriftSender.flush(ThriftSender.java:113)
... 3 common frames omitted
Caused by: org.apache.thrift.transport.TTransportException: Cannot flush closed transport
at io.jaegertracing.thrift.internal.reporters.protocols.ThriftUdpTransport.flush(ThriftUdpTransport.java:148)
at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:73)
at org.apache.thrift.TServiceClient.sendBaseOneway(TServiceClient.java:66)
at io.jaegertracing.agent.thrift.Agent$Client.send_emitBatch(Agent.java:70)
at io.jaegertracing.agent.thrift.Agent$Client.emitBatch(Agent.java:63)
at io.jaegertracing.thrift.internal.senders.UdpSender.send(UdpSender.java:83)
... 4 common frames omitted
Caused by: java.net.PortUnreachableException: ICMP Port Unreachable
at java.base/java.net.PlainDatagramSocketImpl.send(Native Method)
at java.base/java.net.DatagramSocket.send(DatagramSocket.java:695)
at io.jaegertracing.thrift.internal.reporters.protocols.ThriftUdpTransport.flush(ThriftUdpTransport.java:146)
... 9 common frames omitted
解决方案
这是由于在JaegerAutoConfiguration中,默认是配置了Reporter的
@ConditionalOnMissingBean
@Bean
public Reporter reporter(JaegerConfigurationProperties properties,
Metrics metrics,
@Autowired(required = false) ReporterAppender reporterAppender) {
List<Reporter> reporters = new LinkedList<>();
RemoteReporter remoteReporter = properties.getRemoteReporter();
JaegerConfigurationProperties.HttpSender httpSender = properties.getHttpSender();
if (!StringUtils.isEmpty(httpSender.getUrl())) {
reporters.add(getHttpReporter(metrics, remoteReporter, httpSender));
} else {
reporters.add(getUdpReporter(metrics, remoteReporter, properties.getUdpSender()));
}
if (properties.isLogSpans()) {
reporters.add(new LoggingReporter());
}
if (reporterAppender != null) {
reporterAppender.append(reporters);
}
return new CompositeReporter(reporters.toArray(new Reporter[reporters.size()]));
}
这样的话,不管怎样都是会去执行发送数据,我们只需要把这个覆盖掉即可。
@ConfigurationProperties("opentracing.jaeger")
public class PropagateJaegerConfProperties {
private Reporter reporter;
public Reporter getReporter() {
return reporter;
}
public void setReporter(Reporter reporter) {
this.reporter = reporter;
}
public static class Reporter{
private boolean enabled;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}
}
@Configuration(value = "com.moensun.spring.boot.propagate.propagateJaegerConfiguration")
@ConditionalOnClass(io.jaegertracing.internal.JaegerTracer.class)
@ConditionalOnMissingBean(io.opentracing.Tracer.class)
@ConditionalOnProperty(value = "opentracing.jaeger.reporter.enabled", havingValue = "false", matchIfMissing = true)
@AutoConfigureBefore(JaegerAutoConfiguration.class)
@EnableConfigurationProperties(PropagateJaegerConfProperties.class)
public class PropagateJaegerConfiguration {
@ConditionalOnMissingBean
@Bean
public Reporter reporter(@Autowired(required = false) ReporterAppender reporterAppender) {
List<Reporter> reporters = new LinkedList<>();
if (reporterAppender != null) {
reporterAppender.append(reporters);
}
return new CompositeReporter(reporters.toArray(new Reporter[reporters.size()]));
}
}
只需要配置下就不会去发送数据了。
opentracing:
jaeger:
const-sampler:
decision: false
probabilistic-sampler:
sampling-rate: 0
log-spans: false
reporter:
enabled: false
2.官方提供的MDCScopeManager,只记录了opentracing的相关数据,我们的业务数据没有。
自己实现一个MDCScopeManager
public class MSMDCScopeManager implements ScopeManager {
private final ScopeManager wrappedScopeManager;
private final String mdcTraceIdKey;
private final String mdcSpanIdKey;
private final String mdcSampledKey;
private final String mdcTenantIdKey;
private final String mdcActorIdKey;
private MSMDCScopeManager(Builder builder) {
this.wrappedScopeManager = builder.scopeManager;
this.mdcTraceIdKey = builder.mdcTraceIdKey;
this.mdcSpanIdKey = builder.mdcSpanIdKey;
this.mdcSampledKey = builder.mdcSampledKey;
this.mdcTenantIdKey = builder.mdcTenantIdKey;
this.mdcActorIdKey = builder.mdcActorIdKey;
}
@Override
public Scope activate(Span span) {
return new MDCScope(wrappedScopeManager.activate(span), span);
}
@Override
public Span activeSpan() {
return wrappedScopeManager.activeSpan();
}
public static class Builder {
private ScopeManager scopeManager = new ThreadLocalScopeManager();
private String mdcTraceIdKey = "traceId";
private String mdcSpanIdKey = "spanId";
private String mdcSampledKey = "sampled";
private String mdcTenantIdKey = ActorConstants.X_TENANT_ID;
private String mdcActorIdKey = ActorConstants.X_ACTOR_ID;
public Builder withScopeManager(ScopeManager scopeManager) {
this.scopeManager = scopeManager;
return this;
}
public Builder withMDCTraceIdKey(String mdcTraceIdKey) {
this.mdcTraceIdKey = mdcTraceIdKey;
return this;
}
public Builder withMDCSpanIdKey(String mdcSpanIdKey) {
this.mdcSpanIdKey = mdcSpanIdKey;
return this;
}
public Builder withMDCSampledKey(String mdcSampledKey) {
this.mdcSampledKey = mdcSampledKey;
return this;
}
public Builder withMDCTenantIdKey(String mdcTenantIdKey) {
this.mdcTenantIdKey = mdcTenantIdKey;
return this;
}
public Builder withMDCActorIdKey(String mdcActorIdKey) {
this.mdcActorIdKey = mdcActorIdKey;
return this;
}
public MSMDCScopeManager build() {
return new MSMDCScopeManager(this);
}
}
private class MDCScope implements Scope {
private final Scope wrappedScope;
private final String previousTraceId;
private final String previousSpanId;
private final String previousSampled;
private final String previousTenantId;
private final String previousActorId;
/**
* mdcScope.
*/
MDCScope(Scope scope, Span span) {
this.wrappedScope = scope;
this.previousTraceId = MDC.get(mdcTraceIdKey);
this.previousSpanId = MDC.get(mdcSpanIdKey);
this.previousSampled = MDC.get(mdcSampledKey);
this.previousTenantId = MDC.get(mdcTenantIdKey);
this.previousActorId = MDC.get(mdcActorIdKey);
if (span.context() instanceof JaegerSpanContext) {
putContext((JaegerSpanContext) span.context());
}
}
protected void putContext(JaegerSpanContext spanContext) {
replace(mdcTraceIdKey, spanContext.toTraceId());
replace(mdcSpanIdKey, spanContext.toSpanId());
replace(mdcSampledKey, String.valueOf(spanContext.isSampled()));
replace(mdcTenantIdKey, spanContext.getBaggageItem(ActorConstants.MDC_TENANT_ID));
replace(mdcActorIdKey, spanContext.getBaggageItem(ActorConstants.MDC_ACTOR_ID));
}
private void replace(String key, String value) {
if (value == null) {
MDC.remove(key);
} else {
MDC.put(key, value);
}
}
@Override
public void close() {
wrappedScope.close();
replace(mdcTraceIdKey, previousTraceId);
replace(mdcSpanIdKey, previousSpanId);
replace(mdcSampledKey, previousSampled);
replace(mdcTenantIdKey, previousTenantId);
replace(mdcActorIdKey, previousActorId);
}
}
}
@Configuration
public class PropagateAutoConfiguration {
@Bean
public TracerBuilderCustomizer tracerBuilderCustomizer() {
return builder -> builder.withScopeManager(new MSMDCScopeManager.Builder().build());
}
}
修改下日志的配置文件
<property name="CONSOLE_LOGGER_PATTERN" value="[%clr(%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}}){faint}] [${applicationName}] [${environment}] [%X{traceId:-trade-id}] [%X{companyId:-company-id}] [%X{operatorId:-operator-id}] %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
这样在打印的日志中就可以带上我们的context信息。
至此,完整的链路传递就可以实现了。下次写链路传递中多线程的解决方案。