背景
在微服务场景下,我们通常需要对特定的流量身份进行标识,如在异地多活场景下,我们需要对来源地不同的流量进行身份标识,以便于我们能流量到对应的机房。如在生产压测场景下,也需要将这部分流量进行特殊处理。
那么我们该怎么区分开这部分流量呢。OpenTracing 便能够完美解决这些(关于OpenTracing的介绍在这篇文章里不做深入介绍,请自行了解)。今天我主要介绍在gRpc链路中利用OpenTracing解决链路信息传递的问题
引入openTracing
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-util</artifactId>
</dependency>
gRpc拦截器
gRpc拦截器分为client端和server端,client端可以在每次发起调用前,优先埋进一些我们需要的字段,而在server端,在收到请求时优先对请求中的数据做一些处理后再转交给指定的服务处理并响应。
我们分别来看下client和server端的处理方式
gRpc client拦截器
/**
* 实现grpc ClientInterceptor接口
*/
public class ClientTracingInterceptor implements ClientInterceptor {
// 构建tracer
private static final Tracer tracer = Tracing.init("tag-test");
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next
) {
// 构建span,span 在openTracing里为最小单元
Span span = tracer.buildSpan("client-span").start();
span.setTag("kind","client");
// 构建scope,scope表示一次生命周期
// 1. 先判断有没有上游活跃的span,如果有直接从scope里面拿
// 2. 如果没有,那么会构建一个新的span
try (Scope ignored = tracer.scopeManager().activate(span)) {
// 传递流量标识
span.setBaggageItem("tag", "test");
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, final Metadata headers){
// 跨进程传递
// grpc client 将metaData headerkey作为唯一标识用于跨进程传递
// 将SpanContext 的信息序列化到 Request.Builder 之中。并将后续操作就可以把序列化之后的信息转换到 Header之中
tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS,new TextMap(){
@Override
public void put(String key, String value) {
Metadata.Key<String> headerKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
headers.put(headerKey, value);
}
@Override
public Iterator<Entry<String, String>> iterator() {
throw new UnsupportedOperationException(
"TextMapInjectAdapter should only be used with Tracer.inject()"
);
}
});
}
}
// 忽略sendMessage,halfClose,cancel
// 我们只看tag的传递
} finaly {
// 最后必须finish,只有finish后才会上报链路信息
span.finish();
}
}
}
上面的代码我们简单看了下gRpc client端的拦截器实现,主要分为以下几步
-
构建Tracer,并启动
-
构建client的span,标识client
-
客户端拦截器,将tag=test塞到Baggage里面
-
将grpc的header 作为key构建 tracer的header 并tracer.inject 跨进程传递
-
finaly 上报链路信息
gRpc server 拦截器
public class ServerTracingInterceptor implements ServerInterceptor {
// 构建tracer
// tracer 的定义要和client一样,标识同一类型的tracer
private static final Tracer tracer = Tracing.init("tag-test");
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next
) {
// 构建headerMap
Map<String, String> headerMap = new HashMap<>(headerKeys.size());
for (String key : headerKeys) {
if (!key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
String value = headers.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
headerMap.put(key, value);
}
}
try {
// 从Carrier 中extract 出SpanContext实例
SpanContext parentSpanCtx =
tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(headers));
// 如果能够拿到父级的context,那么将他作为子级构建起来,也就是作为client端的子级,这样,链路的体现那就是树形的串联,如下
if (parentSpanCtx != null) {
spanBuilder = spanBuilder.asChildOf(parentSpanCtx);
}
} catch (IllegalArgumentException e) {
log.error("")
}
try (Scope ignored = tracer.scopeManager().activate(span)) {
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
listenerWithContext) {
private ReqT message;
@Override
public void onReady() {
// 标识拦截OK
try (Scope ignored = tracer.scopeManager().activate(span)) {
super.onReady();
}
}
};
} finaly {
span.finish();
}
}
}
-
构建header 作为key
-
extract上文中传递的信息
-
构建server端的span,区分parent
总结
以上就是一次简单的利用openTracing +grpc拦截器实现的信息透传。