gRPC的链路追踪
承蒙大家厚爱,我的《Go语言之路》的纸质版图书已经上架京东,有需要的朋友请点击 此链接 购买。
Open-Telemetry的第三方软件包合集 包括了多个社区中常用库的OpenTelemetry支持。随着 OpenTelemetry的不断迭代,相信整个链路追踪的生态也会越发完善。
gRPC的链路追踪
gRPC 的 trace 数据,推荐使用 otelgrpc 。
安装依赖。
go get go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc
这里使用一个简单的 hello 程序,其proto
文件内容如下。
syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本
package pb; // 包名
// 定义服务
service Greeter {
// SayHello 方法
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
// 请求消息
message HelloRequest {
string name = 1;
}
// 响应消息
message HelloResponse {
string reply = 1;
}
Server端
在创建 gRPC Server 时,按如下方式设置 StatsHandler
为otelgrpc.NewServerHandler()
。
import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
// ...
s := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()), // 设置 StatsHandler
)
完整代码如下:
package main
import (
"context"
"fmt"
"log"
"net"
"time"
"hello_server/pb"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
const (
serviceName = "gRPC-Jaeger-Demo"
jaegerRPCEndpoint = "127.0.0.1:4317" // Jaeger RPC端口
)
var tracer = otel.Tracer("grpc-example")
// newJaegerTraceProvider 创建一个 Jaeger Trace Provider
func newJaegerTraceProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
// 创建一个使用 HTTP 协议连接本机Jaeger的 Exporter
exp, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint(jaegerRPCEndpoint),
otlptracegrpc.WithInsecure())
if err != nil {
return nil, err
}
res, err := resource.New(ctx, resource.WithAttributes(semconv.ServiceName(serviceName)))
if err != nil {
return nil, err
}
traceProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()), // 采样
sdktrace.WithBatcher(exp, sdktrace.WithBatchTimeout(time.Second)),
)
return traceProvider, nil
}
// initTracer 初始化 Tracer
func initTracer(ctx context.Context) (*sdktrace.TracerProvider, error) {
tp, err := newJaegerTraceProvider(ctx)
if err != nil {
return nil, err
}
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}),
)
return tp, nil
}
type server struct {
pb.UnimplementedGreeterServer
}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
md, _ := metadata.FromIncomingContext(ctx)
_, span := tracer.Start(ctx, "SayHello",
trace.WithAttributes(
attribute.String("name", in.GetName()),
attribute.StringSlice("client-id", md.Get("client-id")),
attribute.StringSlice("user-id", md.Get("user-id")),
),
)
defer span.End()
return &pb.HelloResponse{Reply: "Hello " + in.Name}, nil
}
func main() {
ctx := context.Background()
tp, err := initTracer(ctx)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
// 监听本地的8972端口
lis, err := net.Listen("tcp", ":8972")
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
// 创建gRPC服务器
s := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()), // 设置 StatsHandler
)
pb.RegisterGreeterServer(s, &server{}) // 在gRPC服务端注册服务
// 启动服务
err = s.Serve(lis)
if err != nil {
fmt.Printf("failed to serve: %v", err)
return
}
}
Client端
在建立连接时,指定 StatsHandler
为otelgrpc.NewClientHandler()
。
import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
// ...
conn, err := grpc.NewClient(
*addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()), // 设置 StatsHandler
)
完整代码:
package main
import (
"context"
"flag"
"log"
"time"
"hello_client/pb"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)
const (
serviceName = "gRPC-Jaeger-Demo"
jaegerEndpoint = "127.0.0.1:4317"
)
var tracer = otel.Tracer("grpc-client-example")
// newJaegerTraceProvider 创建一个 Jaeger Trace Provider
func newJaegerTraceProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
// 创建一个使用 HTTP 协议连接本机Jaeger的 Exporter
exp, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint(jaegerEndpoint),
otlptracegrpc.WithInsecure())
if err != nil {
return nil, err
}
res, err := resource.New(ctx, resource.WithAttributes(semconv.ServiceName(serviceName)))
if err != nil {
return nil, err
}
traceProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()), // 采样
sdktrace.WithBatcher(exp, sdktrace.WithBatchTimeout(time.Second)),
)
return traceProvider, nil
}
// initTracer 初始化 Tracer
func initTracer(ctx context.Context) (*sdktrace.TracerProvider, error) {
tp, err := newJaegerTraceProvider(ctx)
if err != nil {
return nil, err
}
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}),
)
return tp, nil
}
const (
defaultName = "world"
)
var (
addr = flag.String("addr", "127.0.0.1:8972", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)
func main() {
flag.Parse()
tp, err := initTracer(context.Background())
if err != nil {
log.Fatal(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
// 连接到server端,此处禁用安全传输
conn, err := grpc.NewClient(
*addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()), // 设置 StatsHandler
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer func() { _ = conn.Close() }()
c := pb.NewGreeterClient(conn)
// 执行RPC调用并打印收到的响应数据
md := metadata.Pairs(
"timestamp", time.Now().Format(time.StampNano),
"client-id", "hello-client-q1mi",
"user-id", "7",
)
ctx := metadata.NewOutgoingContext(context.Background(), md)
ctx, span := tracer.Start(ctx, "c.SayHello", trace.WithAttributes(attribute.String("name", "gRPC-client")))
defer span.End()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetReply())
}
trace 数据: