四种通信模式
2026/1/15大约 4 分钟gRPCStreaming通信模式
四种通信模式
一、一元 RPC(Unary RPC)
1.1 定义
service UserService {
rpc GetUser(GetUserRequest) returns (User);
}
message GetUserRequest {
int32 id = 1;
}
message User {
int32 id = 1;
string name = 2;
string email = 3;
}1.2 服务端实现
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
// 模拟查询用户
user := &pb.User{
Id: req.Id,
Name: "张三",
Email: "zhangsan@example.com",
}
return user, nil
}1.3 客户端调用
func main() {
conn, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
defer conn.Close()
client := pb.NewUserServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
if err != nil {
log.Fatal(err)
}
fmt.Printf("用户: %s (%s)\n", user.Name, user.Email)
}二、服务端流(Server Streaming)
2.1 定义
service UserService {
rpc ListUsers(ListUsersRequest) returns (stream User);
}
message ListUsersRequest {
int32 page_size = 1;
}2.2 服务端实现
func (s *server) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
users := []pb.User{
{Id: 1, Name: "张三", Email: "zhangsan@example.com"},
{Id: 2, Name: "李四", Email: "lisi@example.com"},
{Id: 3, Name: "王五", Email: "wangwu@example.com"},
}
for _, user := range users {
// 发送每个用户
if err := stream.Send(&user); err != nil {
return err
}
time.Sleep(500 * time.Millisecond) // 模拟延迟
}
return nil
}2.3 客户端调用
func main() {
conn, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
defer conn.Close()
client := pb.NewUserServiceClient(conn)
stream, err := client.ListUsers(context.Background(), &pb.ListUsersRequest{PageSize: 10})
if err != nil {
log.Fatal(err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("收到用户: %s\n", user.Name)
}
}三、客户端流(Client Streaming)
3.1 定义
service FileService {
rpc UploadFile(stream FileChunk) returns (UploadResponse);
}
message FileChunk {
string filename = 1;
bytes data = 2;
int32 chunk_number = 3;
}
message UploadResponse {
string filename = 1;
int64 size = 2;
bool success = 3;
}3.2 服务端实现
func (s *server) UploadFile(stream pb.FileService_UploadFileServer) error {
var filename string
var totalSize int64
for {
chunk, err := stream.Recv()
if err == io.EOF {
// 客户端发送完毕
return stream.SendAndClose(&pb.UploadResponse{
Filename: filename,
Size: totalSize,
Success: true,
})
}
if err != nil {
return err
}
filename = chunk.Filename
totalSize += int64(len(chunk.Data))
log.Printf("收到文件块 %d, 大小: %d", chunk.ChunkNumber, len(chunk.Data))
}
}3.3 客户端调用
func main() {
conn, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
defer conn.Close()
client := pb.NewFileServiceClient(conn)
stream, err := client.UploadFile(context.Background())
if err != nil {
log.Fatal(err)
}
// 模拟发送文件块
for i := 0; i < 5; i++ {
chunk := &pb.FileChunk{
Filename: "test.txt",
Data: []byte(fmt.Sprintf("chunk %d data", i)),
ChunkNumber: int32(i),
}
if err := stream.Send(chunk); err != nil {
log.Fatal(err)
}
}
// 关闭发送并接收响应
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
fmt.Printf("上传完成: %s, 大小: %d\n", resp.Filename, resp.Size)
}四、双向流(Bidirectional Streaming)
4.1 定义
service ChatService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user = 1;
string content = 2;
int64 timestamp = 3;
}4.2 服务端实现
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("收到消息: [%s] %s", msg.User, msg.Content)
// 回复消息
reply := &pb.ChatMessage{
User: "Server",
Content: "收到: " + msg.Content,
Timestamp: time.Now().Unix(),
}
if err := stream.Send(reply); err != nil {
return err
}
}
}4.3 客户端调用
func main() {
conn, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
defer conn.Close()
client := pb.NewChatServiceClient(conn)
stream, err := client.Chat(context.Background())
if err != nil {
log.Fatal(err)
}
// 启动接收协程
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("收到: [%s] %s\n", msg.User, msg.Content)
}
}()
// 发送消息
messages := []string{"你好", "今天天气不错", "再见"}
for _, content := range messages {
msg := &pb.ChatMessage{
User: "Client",
Content: content,
Timestamp: time.Now().Unix(),
}
if err := stream.Send(msg); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
stream.CloseSend()
time.Sleep(time.Second) // 等待接收完成
}五、完整示例
5.1 Proto 定义
syntax = "proto3";
package demo;
option go_package = "demo/pb";
// 用户服务
service UserService {
// 一元 RPC
rpc GetUser(GetUserRequest) returns (User);
// 服务端流
rpc ListUsers(ListUsersRequest) returns (stream User);
// 客户端流
rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateResponse);
// 双向流
rpc SyncUsers(stream User) returns (stream User);
}
message User {
int32 id = 1;
string name = 2;
string email = 3;
}
message GetUserRequest {
int32 id = 1;
}
message ListUsersRequest {
int32 page_size = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
}
message BatchCreateResponse {
int32 count = 1;
bool success = 2;
}5.2 服务端完整实现
package main
import (
"context"
"io"
"log"
"net"
"time"
"google.golang.org/grpc"
pb "demo/pb"
)
type server struct {
pb.UnimplementedUserServiceServer
}
// 一元 RPC
func (s *server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
return &pb.User{
Id: req.Id,
Name: "张三",
Email: "zhangsan@example.com",
}, nil
}
// 服务端流
func (s *server) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
for i := 1; i <= 5; i++ {
user := &pb.User{
Id: int32(i),
Name: fmt.Sprintf("用户%d", i),
Email: fmt.Sprintf("user%d@example.com", i),
}
if err := stream.Send(user); err != nil {
return err
}
time.Sleep(500 * time.Millisecond)
}
return nil
}
// 客户端流
func (s *server) BatchCreateUsers(stream pb.UserService_BatchCreateUsersServer) error {
var count int32
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.BatchCreateResponse{
Count: count,
Success: true,
})
}
if err != nil {
return err
}
count++
log.Printf("创建用户: %s", req.Name)
}
}
// 双向流
func (s *server) SyncUsers(stream pb.UserService_SyncUsersServer) error {
for {
user, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 处理并返回
user.Name = "已同步: " + user.Name
if err := stream.Send(user); err != nil {
return err
}
}
}
func main() {
lis, _ := net.Listen("tcp", ":50051")
s := grpc.NewServer()
pb.RegisterUserServiceServer(s, &server{})
log.Println("服务器启动: :50051")
s.Serve(lis)
}六、使用场景
6.1 一元 RPC
- 简单的请求-响应
- CRUD 操作
- 认证登录
6.2 服务端流
- 大量数据返回
- 实时数据推送
- 日志流
6.3 客户端流
- 文件上传
- 批量数据提交
- 传感器数据收集
6.4 双向流
- 实时聊天
- 游戏通信
- 协同编辑
