跳转至

📖 gRPC 与微服务

学习时间: 约 7-8 小时 | 难度: ⭐⭐⭐⭐ 高级 | 前置知识: Go基础、接口、并发编程、Web开发

📚 章节概述

gRPC 是 Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers。Go 天然的并发优势使其成为构建微服务的首选语言之一。本章将从 protobuf 定义开始,深入 gRPC 四种通信模式、拦截器、错误处理,再延伸到微服务架构中的服务注册、发现、配置中心及 go-kratos 框架实践。

gRPC微服务拓扑图

上图展示了网关、服务实例与注册/配置基础设施之间的典型交互拓扑。

🎯 学习目标

  • 掌握 Protocol Buffers 3 语法和代码生成
  • 精通 gRPC 四种通信模式(Unary、Server Streaming、Client Streaming、Bidirectional)
  • 理解拦截器(Interceptor)的使用场景
  • 掌握 gRPC 错误处理和元数据传递
  • 了解微服务架构模式和 go-kratos 框架
  • 学会服务注册/发现与配置中心的集成

📌 概念:Protocol Buffers

1.1 protobuf 基础

Protocol Buffers(简称 protobuf)是 Google 开发的与语言无关、与平台无关的序列化数据格式。相比 JSON,protobuf 更小、更快、更高效。

Protocol Buffer
// user.proto
syntax = "proto3";

package user;
option go_package = "myproject/api/user/v1;userv1";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// 用户服务定义
service UserService {
    // 一元 RPC
    rpc GetUser(GetUserRequest) returns (GetUserResponse);
    rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
    rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
    rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);

    // 服务端流式 RPC
    rpc ListUsers(ListUsersRequest) returns (stream User);

    // 客户端流式 RPC
    rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateResponse);

    // 双向流式 RPC
    rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

// 消息定义
message User {
    uint64 id = 1;
    string name = 2;
    string email = 3;
    UserRole role = 4;
    google.protobuf.Timestamp created_at = 5;
    repeated string tags = 6;          // 重复字段 = 切片
    map<string, string> metadata = 7;  // 映射字段
    optional string avatar = 8;        // 可选字段
}

// 枚举
enum UserRole {
    USER_ROLE_UNSPECIFIED = 0;
    USER_ROLE_ADMIN = 1;
    USER_ROLE_USER = 2;
    USER_ROLE_GUEST = 3;
}

message GetUserRequest {
    uint64 id = 1;
}

message GetUserResponse {
    User user = 1;
}

message CreateUserRequest {
    string name = 1;
    string email = 2;
    UserRole role = 3;
}

message CreateUserResponse {
    User user = 1;
}

message UpdateUserRequest {
    uint64 id = 1;
    optional string name = 2;
    optional string email = 3;
}

message UpdateUserResponse {
    User user = 1;
}

message DeleteUserRequest {
    uint64 id = 1;
}

message ListUsersRequest {
    int32 page = 1;
    int32 page_size = 2;
}

message BatchCreateResponse {
    int32 created_count = 1;
}

message ChatMessage {
    string user_id = 1;
    string content = 2;
    google.protobuf.Timestamp timestamp = 3;
}

1.2 代码生成

Bash
# 安装 protoc 编译器和 Go 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# 生成代码
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       api/user/v1/user.proto

生成两个文件: - user.pb.go — 消息结构体和序列化方法 - user_grpc.pb.go — 服务接口和客户端代码


📌 概念:gRPC 四种通信模式

2.1 一元 RPC(Unary)

Go
// server.go — 实现服务端
package main

import (
    "context"
    "log"
    "net"
    "sync"

    pb "myproject/api/user/v1"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/timestamppb"
)

type userServer struct {
    pb.UnimplementedUserServiceServer
    mu    sync.RWMutex
    users map[uint64]*pb.User
    nextID uint64
}

func NewUserServer() *userServer {
    return &userServer{
        users:  make(map[uint64]*pb.User),
        nextID: 1,
    }
}

// 一元 RPC 实现
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    user, ok := s.users[req.Id]
    if !ok {
        // 使用 gRPC 标准错误码
        return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
    }

    return &pb.GetUserResponse{User: user}, nil
}

func (s *userServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
    // 参数验证
    if req.Name == "" {
        return nil, status.Error(codes.InvalidArgument, "用户名不能为空")
    }
    if req.Email == "" {
        return nil, status.Error(codes.InvalidArgument, "邮箱不能为空")
    }

    s.mu.Lock()
    defer s.mu.Unlock()

    user := &pb.User{
        Id:        s.nextID,
        Name:      req.Name,
        Email:     req.Email,
        Role:      req.Role,
        CreatedAt: timestamppb.Now(),
    }
    s.users[s.nextID] = user
    s.nextID++

    return &pb.CreateUserResponse{User: user}, nil
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("无法监听: %v", err)
    }

    grpcServer := grpc.NewServer()
    pb.RegisterUserServiceServer(grpcServer, NewUserServer())

    log.Println("gRPC 服务器启动在 :50051")
    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("启动失败: %v", err)
    }
}
Go
// client.go — 客户端调用
package main

import (
    "context"
    "log"
    "time"

    pb "myproject/api/user/v1"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/status"
)

func main() {
    // 建立连接
    conn, err := grpc.NewClient("localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        log.Fatalf("连接失败: %v", err)
    }
    defer conn.Close()

    client := pb.NewUserServiceClient(conn)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // 创建用户
    createResp, err := client.CreateUser(ctx, &pb.CreateUserRequest{
        Name:  "Alice",
        Email: "alice@example.com",
        Role:  pb.UserRole_USER_ROLE_USER,
    })
    if err != nil {
        st, ok := status.FromError(err)
        if ok {
            log.Printf("gRPC 错误: code=%s, msg=%s", st.Code(), st.Message())
        }
        log.Fatalf("创建失败: %v", err)
    }
    log.Printf("创建成功: %v", createResp.User)

    // 查询用户
    getResp, err := client.GetUser(ctx, &pb.GetUserRequest{
        Id: createResp.User.Id,
    })
    if err != nil {
        log.Fatalf("查询失败: %v", err)
    }
    log.Printf("查询结果: %v", getResp.User)
}

2.2 服务端流式 RPC

Go
// 服务端实现 — 服务端流
func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
    s.mu.RLock()
    defer s.mu.RUnlock()

    for _, user := range s.users {
        // 通过 stream.Send 逐个发送
        if err := stream.Send(user); err != nil {
            return status.Errorf(codes.Internal, "发送失败: %v", err)
        }
        // 实际场景可能从数据库游标读取
    }
    return nil // 返回 nil 自动关闭流
}

// 客户端 — 接收服务端流
func listUsers(client pb.UserServiceClient) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
        Page:     1,
        PageSize: 100,
    })
    if err != nil {
        log.Fatalf("ListUsers 调用失败: %v", err)
    }

    for {
        user, err := stream.Recv()
        if err == io.EOF {
            break // 流结束
        }
        if err != nil {
            log.Fatalf("接收失败: %v", err)
        }
        log.Printf("收到用户: %s", user.Name)
    }
}

2.3 客户端流式 RPC

Go
// 服务端实现 — 接收客户端流
func (s *userServer) BatchCreateUsers(stream pb.UserService_BatchCreateUsersServer) error {
    var count int32

    for {
        req, err := stream.Recv()
        if err == io.EOF {
            // 客户端发送完毕,返回响应
            return stream.SendAndClose(&pb.BatchCreateResponse{
                CreatedCount: count,
            })
        }
        if err != nil {
            return status.Errorf(codes.Internal, "接收失败: %v", err)
        }

        // 处理每条数据
        s.mu.Lock()
        s.users[s.nextID] = &pb.User{
            Id:        s.nextID,
            Name:      req.Name,
            Email:     req.Email,
            CreatedAt: timestamppb.Now(),
        }
        s.nextID++
        s.mu.Unlock()
        count++
    }
}

// 客户端 — 发送流
func batchCreate(client pb.UserServiceClient) {
    stream, err := client.BatchCreateUsers(context.Background())
    if err != nil {
        log.Fatalf("创建流失败: %v", err)
    }

    users := []struct{ Name, Email string }{
        {"Alice", "alice@example.com"},
        {"Bob", "bob@example.com"},
        {"Charlie", "charlie@example.com"},
    }

    for _, u := range users {
        if err := stream.Send(&pb.CreateUserRequest{
            Name:  u.Name,
            Email: u.Email,
        }); err != nil {
            log.Fatalf("发送失败: %v", err)
        }
    }

    resp, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("关闭流失败: %v", err)
    }
    log.Printf("批量创建 %d 个用户", resp.CreatedCount)
}

2.4 双向流式 RPC

Go
// 服务端 — 双向流
func (s *userServer) Chat(stream pb.UserService_ChatServer) error {
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return status.Errorf(codes.Internal, "接收失败: %v", err)
        }

        log.Printf("[%s] %s", msg.UserId, msg.Content)

        // 回复消息
        reply := &pb.ChatMessage{
            UserId:    "server",
            Content:   fmt.Sprintf("收到: %s", msg.Content),
            Timestamp: timestamppb.Now(),
        }
        if err := stream.Send(reply); err != nil {
            return status.Errorf(codes.Internal, "发送失败: %v", err)
        }
    }
}

// 客户端 — 双向流
func chat(client pb.UserServiceClient) {
    stream, err := client.Chat(context.Background())
    if err != nil {
        log.Fatalf("创建流失败: %v", err)
    }

    // 启动接收协程
    go func() {
        for {
            msg, err := stream.Recv()
            if err == io.EOF {
                return
            }
            if err != nil {
                log.Printf("接收错误: %v", err)
                return
            }
            log.Printf("收到: [%s] %s", msg.UserId, msg.Content)
        }
    }()

    // 发送消息
    messages := []string{"你好", "gRPC双向流", "真的很酷"}
    for _, msg := range messages {
        if err := stream.Send(&pb.ChatMessage{
            UserId:    "client",
            Content:   msg,
            Timestamp: timestamppb.Now(),
        }); err != nil {
            log.Fatalf("发送失败: %v", err)
        }
        time.Sleep(time.Second)
    }

    stream.CloseSend()
    time.Sleep(2 * time.Second) // 等待接收最后的消息
}

📌 概念:拦截器(Interceptor)

3.1 一元拦截器

Go
// 服务端一元拦截器
func UnaryServerInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (any, error) {
    start := time.Now()

    // 前置处理
    log.Printf("请求: %s", info.FullMethod)

    // 调用实际处理函数
    resp, err := handler(ctx, req)

    // 后置处理
    duration := time.Since(start)
    if err != nil {
        log.Printf("失败: %s [%s] %v", info.FullMethod, duration, err)
    } else {
        log.Printf("成功: %s [%s]", info.FullMethod, duration)
    }

    return resp, err
}

// 认证拦截器
func AuthInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (any, error) {
    // 跳过不需要认证的方法
    if info.FullMethod == "/user.UserService/Login" {
        return handler(ctx, req)
    }

    // 从元数据获取 token
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, status.Error(codes.Unauthenticated, "缺少元数据")
    }

    tokens := md.Get("authorization")
    if len(tokens) == 0 {
        return nil, status.Error(codes.Unauthenticated, "缺少 token")
    }

    // 验证 token
    userID, err := validateToken(tokens[0])
    if err != nil {
        return nil, status.Error(codes.Unauthenticated, "token 无效")
    }

    // 注入用户信息到 context
    // ⚠️ context key 必须用自定义类型,不能用 string/int 等内置类型
    //    否则不同包使用相同字符串会导致 key 冲突
    type contextKey string
    const userIDKey contextKey = "user_id"
    newCtx := context.WithValue(ctx, userIDKey, userID)
    return handler(newCtx, req)
}

// Recovery 拦截器
func RecoveryInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (resp any, err error) {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("panic 恢复: %v\n%s", r, debug.Stack())
            err = status.Errorf(codes.Internal, "内部错误")
        }
    }()
    return handler(ctx, req)
}

// 注册拦截器
func main() {
    grpcServer := grpc.NewServer(
        grpc.ChainUnaryInterceptor(   // 链式拦截器
            RecoveryInterceptor,
            UnaryServerInterceptor,
            AuthInterceptor,
        ),
    )
    // ...
}

3.2 流式拦截器

Go
// 流式服务端拦截器
func StreamServerInterceptor(
    srv any,
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
) error {
    start := time.Now()
    log.Printf("流请求: %s (Client=%v, Server=%v)",
        info.FullMethod, info.IsClientStream, info.IsServerStream)

    err := handler(srv, ss)

    log.Printf("流结束: %s [%s] err=%v",
        info.FullMethod, time.Since(start), err)
    return err
}

// 客户端拦截器
func UnaryClientInterceptor(
    ctx context.Context,
    method string,
    req, reply any,
    cc *grpc.ClientConn,
    invoker grpc.UnaryInvoker,
    opts ...grpc.CallOption,
) error {
    // 自动注入 token
    md := metadata.Pairs("authorization", "Bearer "+getToken())
    ctx = metadata.NewOutgoingContext(ctx, md)

    start := time.Now()
    err := invoker(ctx, method, req, reply, cc, opts...)
    log.Printf("客户端调用 %s [%s]", method, time.Since(start))
    return err
}

📌 概念:gRPC 错误处理与元数据

4.1 丰富的错误信息

Go
import (
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/genproto/googleapis/rpc/errdetails"
)

// 返回带详细信息的错误
func (s *userServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
    // 输入验证
    var violations []*errdetails.BadRequest_FieldViolation
    if req.Name == "" {
        violations = append(violations, &errdetails.BadRequest_FieldViolation{
            Field:       "name",
            Description: "用户名不能为空",
        })
    }
    if req.Email == "" {
        violations = append(violations, &errdetails.BadRequest_FieldViolation{
            Field:       "email",
            Description: "邮箱不能为空",
        })
    }

    if len(violations) > 0 {
        st := status.New(codes.InvalidArgument, "参数验证失败")
        br := &errdetails.BadRequest{FieldViolations: violations}
        st, _ = st.WithDetails(br)
        return nil, st.Err()
    }

    // ...
}

// 客户端解析错误详情
func handleError(err error) {
    st, ok := status.FromError(err)
    if !ok {
        log.Printf("非 gRPC 错误: %v", err)
        return
    }

    log.Printf("错误码: %s, 消息: %s", st.Code(), st.Message())

    for _, detail := range st.Details() {
        switch d := detail.(type) {
        case *errdetails.BadRequest:
            for _, v := range d.FieldViolations {
                log.Printf("  字段 %s: %s", v.Field, v.Description)
            }
        case *errdetails.RetryInfo:
            log.Printf("  建议 %s 后重试", d.RetryDelay.AsDuration())
        }
    }
}

4.2 元数据传递

Go
// 客户端发送元数据
func callWithMetadata(client pb.UserServiceClient) {
    md := metadata.New(map[string]string{
        "authorization": "Bearer my-token",
        "x-request-id":  "req-123",
        "x-trace-id":    "trace-456",
    })
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    // 接收服务端返回的元数据
    var header, trailer metadata.MD
    resp, err := client.GetUser(ctx,
        &pb.GetUserRequest{Id: 1},
        grpc.Header(&header),
        grpc.Trailer(&trailer),
    )
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("响应: %v", resp)
    log.Printf("Header: %v", header)
    log.Printf("Trailer: %v", trailer)
}

// 服务端读取/设置元数据
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    // 读取客户端元数据
    md, ok := metadata.FromIncomingContext(ctx)
    if ok {
        if traceIDs := md.Get("x-trace-id"); len(traceIDs) > 0 {
            log.Printf("Trace ID: %s", traceIDs[0])
        }
    }

    // 设置响应元数据
    header := metadata.Pairs("x-server-region", "cn-east")
    grpc.SetHeader(ctx, header)

    trailer := metadata.Pairs("x-processing-time", "50ms")
    grpc.SetTrailer(ctx, trailer)

    // ...
    return &pb.GetUserResponse{}, nil
}

📌 概念:微服务架构

5.1 架构概览

Text Only
                    ┌──────────────┐
                    │   API 网关    │
                    └──────┬───────┘
             ┌─────────────┼─────────────┐
             ▼             ▼             ▼
      ┌──────────┐  ┌──────────┐  ┌──────────┐
      │ 用户服务  │  │ 订单服务  │  │ 商品服务  │
      └────┬─────┘  └────┬─────┘  └────┬─────┘
           │              │              │
      ┌────▼─────┐  ┌────▼─────┐  ┌────▼─────┐
      │  MySQL   │  │  MySQL   │  │  Redis   │
      └──────────┘  └──────────┘  └──────────┘

      ┌────────────────────────────────────────┐
      │         服务注册与发现 (etcd/consul)      │
      └────────────────────────────────────────┘
      ┌────────────────────────────────────────┐
      │         配置中心 (nacos/apollo)          │
      └────────────────────────────────────────┘

5.2 服务注册与发现(etcd)

Go
import (
    clientv3 "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/naming/endpoints"
)

// 服务注册
func registerService(etcdClient *clientv3.Client, serviceName, addr string) error {
    em, err := endpoints.NewManager(etcdClient, serviceName)
    if err != nil {
        return err
    }

    // 创建租约(心跳保活)
    lease, err := etcdClient.Grant(context.Background(), 10)
    if err != nil {
        return err
    }

    // 注册服务端点
    key := fmt.Sprintf("%s/%s", serviceName, addr)
    err = em.AddEndpoint(context.Background(), key,
        endpoints.Endpoint{Addr: addr},
        clientv3.WithLease(lease.ID),
    )
    if err != nil {
        return err
    }

    // 保持租约活跃
    ch, err := etcdClient.KeepAlive(context.Background(), lease.ID)
    if err != nil {
        return err
    }

    go func() {
        for range ch {
            // 保持心跳
        }
        log.Println("KeepAlive 通道关闭")
    }()

    return nil
}

// 服务发现 — 客户端负载均衡
import (
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    etcdresolver "go.etcd.io/etcd/client/v3/naming/resolver"
)

func newClientWithDiscovery(etcdClient *clientv3.Client, serviceName string) (*grpc.ClientConn, error) {
    // 创建 etcd 解析器
    builder, err := etcdresolver.NewBuilder(etcdClient)
    if err != nil {
        return nil, err
    }

    conn, err := grpc.NewClient(
        fmt.Sprintf("etcd:///%s", serviceName),
        grpc.WithResolvers(builder),
        grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    return conn, err
}

5.3 配置中心模式

Go
// 配置结构
type Config struct {
    Server   ServerConfig   `yaml:"server"`
    Database DatabaseConfig `yaml:"database"`
    Redis    RedisConfig    `yaml:"redis"`
}

type ServerConfig struct {
    HTTP HTTPConfig `yaml:"http"`
    GRPC GRPCConfig `yaml:"grpc"`
}

type HTTPConfig struct {
    Addr    string        `yaml:"addr"`
    Timeout time.Duration `yaml:"timeout"`
}

type GRPCConfig struct {
    Addr    string        `yaml:"addr"`
    Timeout time.Duration `yaml:"timeout"`
}

// 从 etcd 加载配置
func loadConfigFromEtcd(client *clientv3.Client, key string) (*Config, error) {
    resp, err := client.Get(context.Background(), key)
    if err != nil {
        return nil, err
    }
    if len(resp.Kvs) == 0 {
        return nil, fmt.Errorf("配置 %s 不存在", key)
    }

    var cfg Config
    if err := yaml.Unmarshal(resp.Kvs[0].Value, &cfg); err != nil {
        return nil, err
    }
    return &cfg, nil
}

// 监听配置变更
func watchConfig(client *clientv3.Client, key string, onChange func(*Config)) {
    wch := client.Watch(context.Background(), key)
    for resp := range wch {
        for _, ev := range resp.Events {
            if ev.Type == clientv3.EventTypePut {
                var cfg Config
                if err := yaml.Unmarshal(ev.Kv.Value, &cfg); err != nil {
                    log.Printf("配置解析失败: %v", err)
                    continue
                }
                onChange(&cfg)
            }
        }
    }
}

📌 概念:go-kratos 框架

6.1 项目初始化

Bash
# 安装 kratos CLI
go install github.com/go-kratos/kratos/cmd/kratos/v2@latest

# 创建项目
kratos new myapp

# 项目结构
myapp/
├── api/                  # protobuf 定义
   └── helloworld/
       └── v1/
           └── greeter.proto
├── cmd/                  # 入口
   └── myapp/
       ├── main.go
       └── wire.go       # 依赖注入
├── configs/              # 配置文件
   └── config.yaml
├── internal/             # 内部代码
   ├── biz/              # 业务逻辑
   ├── data/             # 数据访问
   ├── server/           # 服务器(HTTP + gRPC)
   └── service/          # 服务实现
├── third_party/          # 第三方 proto
├── Makefile
└── go.mod

6.2 Kratos 服务实现

Go
// internal/service/user.go
package service

import (
    "context"
    pb "myapp/api/user/v1"
    "myapp/internal/biz"
)

type UserService struct {
    pb.UnimplementedUserServiceServer
    uc *biz.UserUseCase
}

func NewUserService(uc *biz.UserUseCase) *UserService {
    return &UserService{uc: uc}
}

func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    user, err := s.uc.Get(ctx, req.Id)
    if err != nil {
        return nil, err
    }
    return &pb.GetUserResponse{
        User: &pb.User{
            Id:   user.ID,
            Name: user.Name,
        },
    }, nil
}
Go
// internal/biz/user.go — 业务逻辑层(Clean Architecture 的 UseCase)
package biz

import "context"

type User struct {
    ID    uint64
    Name  string
    Email string
}

// 仓库接口(由 data 层实现)
type UserRepo interface {
    GetByID(ctx context.Context, id uint64) (*User, error)
    Create(ctx context.Context, user *User) error
    Update(ctx context.Context, user *User) error
    Delete(ctx context.Context, id uint64) error
}

type UserUseCase struct {
    repo UserRepo
    log  *log.Helper
}

func NewUserUseCase(repo UserRepo, logger log.Logger) *UserUseCase {
    return &UserUseCase{
        repo: repo,
        log:  log.NewHelper(logger),
    }
}

func (uc *UserUseCase) Get(ctx context.Context, id uint64) (*User, error) {
    user, err := uc.repo.GetByID(ctx, id)
    if err != nil {
        uc.log.WithContext(ctx).Errorf("查询用户失败: %v", err)
        return nil, err
    }
    return user, nil
}
Go
// internal/server/grpc.go — gRPC 服务器注册
package server

import (
    "myapp/internal/service"
    pb "myapp/api/user/v1"

    "github.com/go-kratos/kratos/v2/transport/grpc"
)

func NewGRPCServer(c *conf.Server, userSvc *service.UserService) *grpc.Server {
    opts := []grpc.ServerOption{
        grpc.Address(c.Grpc.Addr),
        grpc.Timeout(c.Grpc.Timeout.AsDuration()),
    }
    srv := grpc.NewServer(opts...)
    pb.RegisterUserServiceServer(srv, userSvc)
    return srv
}
Go
// cmd/myapp/main.go — 入口
package main

import (
    "github.com/go-kratos/kratos/v2"
    "github.com/go-kratos/kratos/v2/transport/grpc"
    "github.com/go-kratos/kratos/v2/transport/http"
)

func newApp(gs *grpc.Server, hs *http.Server) *kratos.App {
    return kratos.New(
        kratos.Name("myapp"),
        kratos.Server(gs, hs),
    )
}

func main() {
    // wire 注入
    app, cleanup, err := wireApp(...)
    if err != nil {
        panic(err)
    }
    defer cleanup()

    if err := app.Run(); err != nil {
        panic(err)
    }
}

💻 代码示例:完整微服务项目

连接池与健康检查

Go
import (
    "google.golang.org/grpc"
    "google.golang.org/grpc/health"
    "google.golang.org/grpc/health/grpc_health_v1"
    "google.golang.org/grpc/keepalive"
)

func newGRPCServer() *grpc.Server {
    srv := grpc.NewServer(
        // Keep-alive 参数
        grpc.KeepaliveParams(keepalive.ServerParameters{
            MaxConnectionIdle:     15 * time.Minute,
            MaxConnectionAge:      30 * time.Minute,
            MaxConnectionAgeGrace: 5 * time.Second,
            Time:                  5 * time.Minute,
            Timeout:               1 * time.Second,
        }),
        // 拦截器链
        grpc.ChainUnaryInterceptor(
            RecoveryInterceptor,
            UnaryServerInterceptor,
        ),
        // 最大消息大小
        grpc.MaxRecvMsgSize(10 * 1024 * 1024), // 10MB
    )

    // 注册健康检查服务
    healthSrv := health.NewServer()
    grpc_health_v1.RegisterHealthServer(srv, healthSrv)
    healthSrv.SetServingStatus("user.UserService", grpc_health_v1.HealthCheckResponse_SERVING)

    return srv
}

// 客户端连接配置
func newClientConn(addr string) (*grpc.ClientConn, error) {
    return grpc.NewClient(addr,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithKeepaliveParams(keepalive.ClientParameters{
            Time:                10 * time.Second,
            Timeout:             3 * time.Second,
            PermitWithoutStream: true,
        }),
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(10*1024*1024),
        ),
    )
}

✅ 最佳实践

  1. proto 文件版本化: 使用 api/v1/ 路径管理 API 版本,避免破坏变更
  2. 使用 gRPC 标准错误码: 建立 codes.NotFoundcodes.InvalidArgument 等标准映射
  3. 拦截器分层: Recovery → 日志 → 认证 → 限流 → 业务
  4. Context 传递: 始终传递 ctx,用于超时、取消和链路追踪
  5. 优雅关停: GracefulStop() 替代 Stop(),等待进行中的 RPC 完成
  6. 健康检查: 实现 gRPC Health Checking Protocol,配合 Kubernetes 就绪探针
  7. 幂等设计: 客户端重试时服务端必须保证幂等性

🎯 面试题

Q1: gRPC 和 REST 的主要区别是什么?

A: 1) 协议:gRPC 基于 HTTP/2,REST 通常基于 HTTP/1.1;2) 编码:gRPC 使用 protobuf(二进制),REST 使用 JSON(文本);3) 性能:gRPC 序列化/反序列化更快、传输更小;4) 流式:gRPC 原生支持四种流模式,REST 需额外方案;5) 代码生成:gRPC 强类型代码生成 vs REST 手写客户端;6) 浏览器支持:REST 更好,gRPC 需要 gRPC-Web 适配;7) 适用场景:gRPC 适合内部微服务通信,REST 适合公开 API。

Q2: gRPC 的四种通信模式分别适合什么场景?

A: 1) Unary(一元): 常规请求-响应,如 CRUD;2) Server Streaming(服务端流): 大量数据下行,如日志推送、报表下载;3) Client Streaming(客户端流): 大量数据上行,如文件上传、批量导入;4) Bidirectional Streaming(双向流): 实时交互,如聊天、游戏同步、实时监控。

Q3: 什么是 gRPC 拦截器?和 HTTP 中间件有什么区别?

A: gRPC 拦截器类似 HTTP 中间件,是在 RPC 调用前后执行的钩子函数。gRPC 分为 Unary Interceptor(一元)和 Stream Interceptor(流式)两种。区别在于:gRPC 拦截器分客户端/服务端两侧,可通过 grpc.ChainUnaryInterceptor 链式组合,处理的是 protobuf 消息而非 HTTP 请求。常见用途:认证、日志、限流、Recovery、链路追踪。

Q4: 微服务中服务发现有哪些模式?

A: 1) 客户端发现: 客户端查询注册中心获取服务列表并自行负载均衡(如 etcd + gRPC resolver);2) 服务端发现: 客户端请求负载均衡器,由它查询注册中心(如 Nginx, Kubernetes Service);3) DNS 发现: 基于 DNS SRV 记录;4) 主流方案:etcd(强一致性 Raft共识)、Consul(支持健康检查)、Nacos(阿里巴巴,Spring Cloud 生态常用)。

Q5: go-kratos 框架的分层架构有什么优势?

A: Kratos 遵循 Clean Architecture 分为四层:1) Server 层: HTTP/gRPC 传输层,模板化;2) Service 层: 接口转换,protobuf ↔ 业务对象;3) Biz 层: 核心业务逻辑,定义 Repository 接口(依赖倒置);4) Data 层: 数据访问实现。优势:业务逻辑不依赖框架/数据库,可独立测试,替换数据存储只需改 Data 层。使用 Wire 做编译期依赖注入,避免反射开销。

Q6: gRPC 的 status 包如何实现丰富的错误处理?

A: google.golang.org/grpc/status 提供标准错误码(17个,如 NotFound, InvalidArgument, Internal 等)。通过 status.New(code, msg) 创建错误,用 WithDetails() 附加结构化信息(如 errdetails.BadRequest 的字段验证错误、errdetails.RetryInfo 的重试建议等)。客户端用 status.FromError() 解析错误码和详情。这比 HTTP 状态码更精细,是微服务间错误传递的最佳实践。


📋 学习检查清单

  • 能编写 protobuf 文件并生成 Go 代码
  • 能实现 gRPC 的四种通信模式
  • 理解并能编写服务端/客户端拦截器
  • 掌握 gRPC 错误处理和元数据传递
  • 了解微服务架构中的服务注册/发现
  • 理解配置中心的工作原理
  • 了解 go-kratos 框架的分层架构
  • 能搭建完整的 gRPC 微服务项目

上一章: 实战项目 | 下一章: 数据库操作