gRPC核心原理
2026/1/15大约 6 分钟gRPC原理源码分析
gRPC 核心原理
一、gRPC 架构概述
1.1 整体架构
1.2 核心组件
| 组件 | 说明 |
|---|---|
| Channel | 客户端与服务端的连接抽象 |
| Stub | 客户端代理,封装 RPC 调用 |
| Server | 服务端,处理请求 |
| Transport | 传输层,基于 HTTP/2 |
| Codec | 编解码器,默认 Protobuf |
| Interceptor | 拦截器,中间件机制 |
二、HTTP/2 协议基础
2.1 HTTP/2 核心特性
2.2 帧结构
+-----------------------------------------------+
| Length (24) |
+---------------+---------------+---------------+
| Type (8) | Flags (8) |
+-+-------------+---------------+-------------------------------+
|R| Stream Identifier (31) |
+=+=============================================================+
| Frame Payload (0...) ...
+---------------------------------------------------------------+帧类型:
- DATA:传输数据
- HEADERS:传输头部
- SETTINGS:连接设置
- PING:心跳检测
- GOAWAY:关闭连接
- RST_STREAM:重置流
2.3 多路复用原理
优势:
- 减少连接数
- 避免队头阻塞
- 提高并发性能
三、Protobuf 编码原理
3.1 编码格式
| Tag | Value | Tag | Value | ...
Tag = (field_number << 3) | wire_typeWire Type:
| Type | 含义 | 用于 |
|---|---|---|
| 0 | Varint | int32, int64, bool |
| 1 | 64-bit | fixed64, double |
| 2 | Length-delimited | string, bytes, message |
| 5 | 32-bit | fixed32, float |
3.2 Varint 编码
// 数字 300 的 Varint 编码
// 300 = 100101100 (二进制)
// 分组: 0000010 0101100
// 加标记: 10101100 00000010
// 结果: [0xAC, 0x02]
func encodeVarint(n uint64) []byte {
var buf []byte
for n >= 0x80 {
buf = append(buf, byte(n)|0x80)
n >>= 7
}
buf = append(buf, byte(n))
return buf
}3.3 消息编码示例
message User {
int32 id = 1;
string name = 2;
}编码示例:
// User{id: 1, name: "Tom"} 编码结果
08 01 // field 1, varint, value=1
12 03 54 6F 6D // field 2, length=3, "Tom"四、Channel 与连接管理
4.1 Channel 结构
// ClientConn 是 Channel 的 Go 实现
type ClientConn struct {
target string // 目标地址
authority string // 权限
dopts dialOptions // 连接选项
csMgr *connectivityStateManager // 连接状态
balancerWrapper *ccBalancerWrapper // 负载均衡
resolverWrapper *ccResolverWrapper // 服务发现
conns map[*addrConn]struct{} // 子连接
}4.2 连接状态机
4.3 连接建立流程
五、服务发现与负载均衡
5.1 Resolver 解析器
// Resolver 接口
type Resolver interface {
ResolveNow(ResolveNowOptions)
Close()
}
// Builder 接口
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}5.2 内置解析器
// DNS 解析器
resolver.Register(&dnsBuilder{})
// passthrough 解析器(直接使用地址)
resolver.Register(&passthroughBuilder{})
// 使用示例
conn, _ := grpc.Dial("dns:///service.example.com:50051")
conn, _ := grpc.Dial("passthrough:///localhost:50051")5.3 负载均衡器
// Balancer 接口
type Balancer interface {
UpdateClientConnState(ClientConnState) error
ResolverError(error)
UpdateSubConnState(SubConn, SubConnState)
Close()
}
// Picker 选择器
type Picker interface {
Pick(info PickInfo) (PickResult, error)
}5.4 负载均衡策略
// 配置负载均衡
conn, _ := grpc.Dial(
"dns:///service:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)六、RPC 调用流程
6.1 一元 RPC 流程
6.2 流式 RPC 流程
6.3 调用栈
// 客户端调用栈
client.SayHello(ctx, req)
-> stub.Invoke()
-> ClientConn.Invoke()
-> invoke()
-> newClientStream()
-> SendMsg()
-> RecvMsg()七、拦截器实现原理
7.1 拦截器链
// 一元拦截器
type UnaryServerInterceptor func(
ctx context.Context,
req interface{},
info *UnaryServerInfo,
handler UnaryHandler,
) (interface{}, error)
// 链式调用
func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
// 构建调用链
chain := handler
for i := len(interceptors) - 1; i >= 0; i-- {
interceptor := interceptors[i]
next := chain
chain = func(ctx context.Context, req interface{}) (interface{}, error) {
return interceptor(ctx, req, info, next)
}
}
return chain(ctx, req)
}
}7.2 执行顺序
八、序列化与编解码
8.1 Codec 接口
type Codec interface {
Marshal(v interface{}) ([]byte, error)
Unmarshal(data []byte, v interface{}) error
Name() string
}8.2 Protobuf Codec
type protoCodec struct{}
func (protoCodec) Marshal(v interface{}) ([]byte, error) {
vv, ok := v.(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to marshal")
}
return proto.Marshal(vv)
}
func (protoCodec) Unmarshal(data []byte, v interface{}) error {
vv, ok := v.(proto.Message)
if !ok {
return fmt.Errorf("failed to unmarshal")
}
return proto.Unmarshal(data, vv)
}8.3 压缩器
type Compressor interface {
Compress(w io.Writer) (io.WriteCloser, error)
Decompress(r io.Reader) (io.Reader, error)
Name() string
}
// 注册 gzip 压缩
encoding.RegisterCompressor(gzip.NewCompressor())
// 使用压缩
conn, _ := grpc.Dial(addr, grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name),
))九、超时与取消机制
9.1 Context 传递
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.SayHello(ctx, req)9.2 超时传播
9.3 取消传播
// 客户端取消
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(2 * time.Second)
cancel() // 取消请求
}()
resp, err := client.SayHello(ctx, req)
// err: context canceled十、错误处理机制
10.1 Status 结构
type Status struct {
code codes.Code
message string
details []*anypb.Any
}
// 创建错误
st := status.New(codes.NotFound, "用户不存在")
st, _ = st.WithDetails(&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequest_FieldViolation{
{Field: "id", Description: "无效的用户ID"},
},
})
return nil, st.Err()10.2 错误传输
HTTP/2 响应头:
grpc-status: 5
grpc-message: 用户不存在十一、健康检查
11.1 健康检查协议
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}
ServingStatus status = 1;
}11.2 实现健康检查
import "google.golang.org/grpc/health"
import healthpb "google.golang.org/grpc/health/grpc_health_v1"
// 注册健康检查服务
healthServer := health.NewServer()
healthpb.RegisterHealthServer(s, healthServer)
// 设置服务状态
healthServer.SetServingStatus("myservice", healthpb.HealthCheckResponse_SERVING)