拦截器与中间件
2026/1/15大约 4 分钟gRPC拦截器Interceptor
拦截器与中间件
一、拦截器概述
1.1 什么是拦截器
拦截器是 gRPC 的中间件机制,可以在 RPC 调用前后执行自定义逻辑。
1.2 拦截器类型
| 类型 | 说明 |
|---|---|
| UnaryServerInterceptor | 服务端一元拦截器 |
| StreamServerInterceptor | 服务端流拦截器 |
| UnaryClientInterceptor | 客户端一元拦截器 |
| StreamClientInterceptor | 客户端流拦截器 |
二、服务端拦截器
2.1 一元拦截器
func UnaryServerInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 前置处理
start := time.Now()
log.Printf("开始调用: %s", info.FullMethod)
// 调用实际处理函数
resp, err := handler(ctx, req)
// 后置处理
log.Printf("调用完成: %s, 耗时: %v", info.FullMethod, time.Since(start))
return resp, err
}2.2 注册拦截器
func main() {
s := grpc.NewServer(
grpc.UnaryInterceptor(UnaryServerInterceptor),
)
// ...
}2.3 流拦截器
func StreamServerInterceptor(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
log.Printf("流开始: %s", info.FullMethod)
err := handler(srv, ss)
log.Printf("流结束: %s", info.FullMethod)
return err
}三、客户端拦截器
3.1 一元拦截器
func UnaryClientInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
log.Printf("客户端调用: %s", method)
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("客户端完成: %s, 耗时: %v", method, time.Since(start))
return err
}3.2 注册拦截器
func main() {
conn, _ := grpc.Dial(
"localhost:50051",
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(UnaryClientInterceptor),
)
// ...
}四、常用拦截器实现
4.1 日志拦截器
func LoggingInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
// 记录请求
log.Printf("[gRPC] 方法: %s, 请求: %+v", info.FullMethod, req)
resp, err := handler(ctx, req)
// 记录响应
duration := time.Since(start)
if err != nil {
log.Printf("[gRPC] 方法: %s, 错误: %v, 耗时: %v", info.FullMethod, err, duration)
} else {
log.Printf("[gRPC] 方法: %s, 响应: %+v, 耗时: %v", info.FullMethod, resp, duration)
}
return resp, err
}4.2 认证拦截器
func AuthInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 从元数据获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少 token")
}
// 验证 token
token := tokens[0]
if !validateToken(token) {
return nil, status.Error(codes.Unauthenticated, "无效的 token")
}
return handler(ctx, req)
}
func validateToken(token string) bool {
// 实现 token 验证逻辑
return token == "valid-token"
}4.3 恢复拦截器
func RecoveryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("[Recovery] panic: %v\n%s", r, debug.Stack())
err = status.Error(codes.Internal, "内部错误")
}
}()
return handler(ctx, req)
}4.4 限流拦截器
import "golang.org/x/time/rate"
var limiter = rate.NewLimiter(100, 10) // 每秒100个请求,突发10个
func RateLimitInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
if !limiter.Allow() {
return nil, status.Error(codes.ResourceExhausted, "请求过于频繁")
}
return handler(ctx, req)
}4.5 超时拦截器
func TimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return handler(ctx, req)
}
}五、链式拦截器
5.1 使用 ChainUnaryInterceptor
import "google.golang.org/grpc"
func main() {
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(
RecoveryInterceptor,
LoggingInterceptor,
AuthInterceptor,
RateLimitInterceptor,
),
)
// ...
}5.2 执行顺序
六、元数据传递
6.1 客户端发送元数据
func main() {
// 创建元数据
md := metadata.New(map[string]string{
"authorization": "Bearer token123",
"request-id": "req-001",
})
// 附加到 context
ctx := metadata.NewOutgoingContext(context.Background(), md)
// 调用服务
resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
}6.2 服务端读取元数据
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 获取元数据
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if tokens := md.Get("authorization"); len(tokens) > 0 {
log.Printf("Token: %s", tokens[0])
}
if ids := md.Get("request-id"); len(ids) > 0 {
log.Printf("Request ID: %s", ids[0])
}
}
return &pb.User{Id: req.Id, Name: "张三"}, nil
}6.3 服务端返回元数据
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 设置响应头
header := metadata.Pairs("x-response-id", "resp-001")
grpc.SendHeader(ctx, header)
// 设置响应尾
trailer := metadata.Pairs("x-processing-time", "100ms")
grpc.SetTrailer(ctx, trailer)
return &pb.User{Id: req.Id, Name: "张三"}, nil
}七、完整示例
7.1 服务端
package main
import (
"context"
"log"
"net"
"runtime/debug"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
pb "demo/pb"
)
// 日志拦截器
func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
log.Printf("[%s] %v", info.FullMethod, time.Since(start))
return resp, err
}
// 恢复拦截器
func RecoveryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("panic: %v\n%s", r, debug.Stack())
err = status.Error(codes.Internal, "内部错误")
}
}()
return handler(ctx, req)
}
// 认证拦截器
func AuthInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}
tokens := md.Get("authorization")
if len(tokens) == 0 || tokens[0] != "Bearer valid-token" {
return nil, status.Error(codes.Unauthenticated, "认证失败")
}
return handler(ctx, req)
}
func main() {
lis, _ := net.Listen("tcp", ":50051")
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(
RecoveryInterceptor,
LoggingInterceptor,
AuthInterceptor,
),
)
pb.RegisterUserServiceServer(s, &server{})
log.Println("服务器启动: :50051")
s.Serve(lis)
}