📖 gRPC 与微服务¶
学习时间: 约 7-8 小时 | 难度: ⭐⭐⭐⭐ 高级 | 前置知识: Go基础、接口、并发编程、Web开发
📚 章节概述¶
gRPC 是 Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers。Go 天然的并发优势使其成为构建微服务的首选语言之一。本章将从 protobuf 定义开始,深入 gRPC 四种通信模式、拦截器、错误处理,再延伸到微服务架构中的服务注册、发现、配置中心及 go-kratos 框架实践。
上图展示了网关、服务实例与注册/配置基础设施之间的典型交互拓扑。
🎯 学习目标¶
- 掌握 Protocol Buffers 3 语法和代码生成
- 精通 gRPC 四种通信模式(Unary、Server Streaming、Client Streaming、Bidirectional)
- 理解拦截器(Interceptor)的使用场景
- 掌握 gRPC 错误处理和元数据传递
- 了解微服务架构模式和 go-kratos 框架
- 学会服务注册/发现与配置中心的集成
📌 概念:Protocol Buffers¶
1.1 protobuf 基础¶
Protocol Buffers(简称 protobuf)是 Google 开发的与语言无关、与平台无关的序列化数据格式。相比 JSON,protobuf 更小、更快、更高效。
// user.proto
syntax = "proto3";
package user;
option go_package = "myproject/api/user/v1;userv1";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
// 用户服务定义
service UserService {
// 一元 RPC
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
// 服务端流式 RPC
rpc ListUsers(ListUsersRequest) returns (stream User);
// 客户端流式 RPC
rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateResponse);
// 双向流式 RPC
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
// 消息定义
message User {
uint64 id = 1;
string name = 2;
string email = 3;
UserRole role = 4;
google.protobuf.Timestamp created_at = 5;
repeated string tags = 6; // 重复字段 = 切片
map<string, string> metadata = 7; // 映射字段
optional string avatar = 8; // 可选字段
}
// 枚举
enum UserRole {
USER_ROLE_UNSPECIFIED = 0;
USER_ROLE_ADMIN = 1;
USER_ROLE_USER = 2;
USER_ROLE_GUEST = 3;
}
message GetUserRequest {
uint64 id = 1;
}
message GetUserResponse {
User user = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
UserRole role = 3;
}
message CreateUserResponse {
User user = 1;
}
message UpdateUserRequest {
uint64 id = 1;
optional string name = 2;
optional string email = 3;
}
message UpdateUserResponse {
User user = 1;
}
message DeleteUserRequest {
uint64 id = 1;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
message BatchCreateResponse {
int32 created_count = 1;
}
message ChatMessage {
string user_id = 1;
string content = 2;
google.protobuf.Timestamp timestamp = 3;
}
1.2 代码生成¶
# 安装 protoc 编译器和 Go 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 生成代码
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
api/user/v1/user.proto
生成两个文件: - user.pb.go — 消息结构体和序列化方法 - user_grpc.pb.go — 服务接口和客户端代码
📌 概念:gRPC 四种通信模式¶
2.1 一元 RPC(Unary)¶
// server.go — 实现服务端
package main
import (
"context"
"log"
"net"
"sync"
pb "myproject/api/user/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
type userServer struct {
pb.UnimplementedUserServiceServer
mu sync.RWMutex
users map[uint64]*pb.User
nextID uint64
}
func NewUserServer() *userServer {
return &userServer{
users: make(map[uint64]*pb.User),
nextID: 1,
}
}
// 一元 RPC 实现
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
s.mu.RLock()
defer s.mu.RUnlock()
user, ok := s.users[req.Id]
if !ok {
// 使用 gRPC 标准错误码
return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
}
return &pb.GetUserResponse{User: user}, nil
}
func (s *userServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
// 参数验证
if req.Name == "" {
return nil, status.Error(codes.InvalidArgument, "用户名不能为空")
}
if req.Email == "" {
return nil, status.Error(codes.InvalidArgument, "邮箱不能为空")
}
s.mu.Lock()
defer s.mu.Unlock()
user := &pb.User{
Id: s.nextID,
Name: req.Name,
Email: req.Email,
Role: req.Role,
CreatedAt: timestamppb.Now(),
}
s.users[s.nextID] = user
s.nextID++
return &pb.CreateUserResponse{User: user}, nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("无法监听: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterUserServiceServer(grpcServer, NewUserServer())
log.Println("gRPC 服务器启动在 :50051")
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("启动失败: %v", err)
}
}
// client.go — 客户端调用
package main
import (
"context"
"log"
"time"
pb "myproject/api/user/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
func main() {
// 建立连接
conn, err := grpc.NewClient("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
client := pb.NewUserServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 创建用户
createResp, err := client.CreateUser(ctx, &pb.CreateUserRequest{
Name: "Alice",
Email: "alice@example.com",
Role: pb.UserRole_USER_ROLE_USER,
})
if err != nil {
st, ok := status.FromError(err)
if ok {
log.Printf("gRPC 错误: code=%s, msg=%s", st.Code(), st.Message())
}
log.Fatalf("创建失败: %v", err)
}
log.Printf("创建成功: %v", createResp.User)
// 查询用户
getResp, err := client.GetUser(ctx, &pb.GetUserRequest{
Id: createResp.User.Id,
})
if err != nil {
log.Fatalf("查询失败: %v", err)
}
log.Printf("查询结果: %v", getResp.User)
}
2.2 服务端流式 RPC¶
// 服务端实现 — 服务端流
func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
s.mu.RLock()
defer s.mu.RUnlock()
for _, user := range s.users {
// 通过 stream.Send 逐个发送
if err := stream.Send(user); err != nil {
return status.Errorf(codes.Internal, "发送失败: %v", err)
}
// 实际场景可能从数据库游标读取
}
return nil // 返回 nil 自动关闭流
}
// 客户端 — 接收服务端流
func listUsers(client pb.UserServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
Page: 1,
PageSize: 100,
})
if err != nil {
log.Fatalf("ListUsers 调用失败: %v", err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
break // 流结束
}
if err != nil {
log.Fatalf("接收失败: %v", err)
}
log.Printf("收到用户: %s", user.Name)
}
}
2.3 客户端流式 RPC¶
// 服务端实现 — 接收客户端流
func (s *userServer) BatchCreateUsers(stream pb.UserService_BatchCreateUsersServer) error {
var count int32
for {
req, err := stream.Recv()
if err == io.EOF {
// 客户端发送完毕,返回响应
return stream.SendAndClose(&pb.BatchCreateResponse{
CreatedCount: count,
})
}
if err != nil {
return status.Errorf(codes.Internal, "接收失败: %v", err)
}
// 处理每条数据
s.mu.Lock()
s.users[s.nextID] = &pb.User{
Id: s.nextID,
Name: req.Name,
Email: req.Email,
CreatedAt: timestamppb.Now(),
}
s.nextID++
s.mu.Unlock()
count++
}
}
// 客户端 — 发送流
func batchCreate(client pb.UserServiceClient) {
stream, err := client.BatchCreateUsers(context.Background())
if err != nil {
log.Fatalf("创建流失败: %v", err)
}
users := []struct{ Name, Email string }{
{"Alice", "alice@example.com"},
{"Bob", "bob@example.com"},
{"Charlie", "charlie@example.com"},
}
for _, u := range users {
if err := stream.Send(&pb.CreateUserRequest{
Name: u.Name,
Email: u.Email,
}); err != nil {
log.Fatalf("发送失败: %v", err)
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("关闭流失败: %v", err)
}
log.Printf("批量创建 %d 个用户", resp.CreatedCount)
}
2.4 双向流式 RPC¶
// 服务端 — 双向流
func (s *userServer) Chat(stream pb.UserService_ChatServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return status.Errorf(codes.Internal, "接收失败: %v", err)
}
log.Printf("[%s] %s", msg.UserId, msg.Content)
// 回复消息
reply := &pb.ChatMessage{
UserId: "server",
Content: fmt.Sprintf("收到: %s", msg.Content),
Timestamp: timestamppb.Now(),
}
if err := stream.Send(reply); err != nil {
return status.Errorf(codes.Internal, "发送失败: %v", err)
}
}
}
// 客户端 — 双向流
func chat(client pb.UserServiceClient) {
stream, err := client.Chat(context.Background())
if err != nil {
log.Fatalf("创建流失败: %v", err)
}
// 启动接收协程
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("接收错误: %v", err)
return
}
log.Printf("收到: [%s] %s", msg.UserId, msg.Content)
}
}()
// 发送消息
messages := []string{"你好", "gRPC双向流", "真的很酷"}
for _, msg := range messages {
if err := stream.Send(&pb.ChatMessage{
UserId: "client",
Content: msg,
Timestamp: timestamppb.Now(),
}); err != nil {
log.Fatalf("发送失败: %v", err)
}
time.Sleep(time.Second)
}
stream.CloseSend()
time.Sleep(2 * time.Second) // 等待接收最后的消息
}
📌 概念:拦截器(Interceptor)¶
3.1 一元拦截器¶
// 服务端一元拦截器
func UnaryServerInterceptor(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
start := time.Now()
// 前置处理
log.Printf("请求: %s", info.FullMethod)
// 调用实际处理函数
resp, err := handler(ctx, req)
// 后置处理
duration := time.Since(start)
if err != nil {
log.Printf("失败: %s [%s] %v", info.FullMethod, duration, err)
} else {
log.Printf("成功: %s [%s]", info.FullMethod, duration)
}
return resp, err
}
// 认证拦截器
func AuthInterceptor(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
// 跳过不需要认证的方法
if info.FullMethod == "/user.UserService/Login" {
return handler(ctx, req)
}
// 从元数据获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少元数据")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少 token")
}
// 验证 token
userID, err := validateToken(tokens[0])
if err != nil {
return nil, status.Error(codes.Unauthenticated, "token 无效")
}
// 注入用户信息到 context
// ⚠️ context key 必须用自定义类型,不能用 string/int 等内置类型
// 否则不同包使用相同字符串会导致 key 冲突
type contextKey string
const userIDKey contextKey = "user_id"
newCtx := context.WithValue(ctx, userIDKey, userID)
return handler(newCtx, req)
}
// Recovery 拦截器
func RecoveryInterceptor(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp any, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("panic 恢复: %v\n%s", r, debug.Stack())
err = status.Errorf(codes.Internal, "内部错误")
}
}()
return handler(ctx, req)
}
// 注册拦截器
func main() {
grpcServer := grpc.NewServer(
grpc.ChainUnaryInterceptor( // 链式拦截器
RecoveryInterceptor,
UnaryServerInterceptor,
AuthInterceptor,
),
)
// ...
}
3.2 流式拦截器¶
// 流式服务端拦截器
func StreamServerInterceptor(
srv any,
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
start := time.Now()
log.Printf("流请求: %s (Client=%v, Server=%v)",
info.FullMethod, info.IsClientStream, info.IsServerStream)
err := handler(srv, ss)
log.Printf("流结束: %s [%s] err=%v",
info.FullMethod, time.Since(start), err)
return err
}
// 客户端拦截器
func UnaryClientInterceptor(
ctx context.Context,
method string,
req, reply any,
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// 自动注入 token
md := metadata.Pairs("authorization", "Bearer "+getToken())
ctx = metadata.NewOutgoingContext(ctx, md)
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("客户端调用 %s [%s]", method, time.Since(start))
return err
}
📌 概念:gRPC 错误处理与元数据¶
4.1 丰富的错误信息¶
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/genproto/googleapis/rpc/errdetails"
)
// 返回带详细信息的错误
func (s *userServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
// 输入验证
var violations []*errdetails.BadRequest_FieldViolation
if req.Name == "" {
violations = append(violations, &errdetails.BadRequest_FieldViolation{
Field: "name",
Description: "用户名不能为空",
})
}
if req.Email == "" {
violations = append(violations, &errdetails.BadRequest_FieldViolation{
Field: "email",
Description: "邮箱不能为空",
})
}
if len(violations) > 0 {
st := status.New(codes.InvalidArgument, "参数验证失败")
br := &errdetails.BadRequest{FieldViolations: violations}
st, _ = st.WithDetails(br)
return nil, st.Err()
}
// ...
}
// 客户端解析错误详情
func handleError(err error) {
st, ok := status.FromError(err)
if !ok {
log.Printf("非 gRPC 错误: %v", err)
return
}
log.Printf("错误码: %s, 消息: %s", st.Code(), st.Message())
for _, detail := range st.Details() {
switch d := detail.(type) {
case *errdetails.BadRequest:
for _, v := range d.FieldViolations {
log.Printf(" 字段 %s: %s", v.Field, v.Description)
}
case *errdetails.RetryInfo:
log.Printf(" 建议 %s 后重试", d.RetryDelay.AsDuration())
}
}
}
4.2 元数据传递¶
// 客户端发送元数据
func callWithMetadata(client pb.UserServiceClient) {
md := metadata.New(map[string]string{
"authorization": "Bearer my-token",
"x-request-id": "req-123",
"x-trace-id": "trace-456",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
// 接收服务端返回的元数据
var header, trailer metadata.MD
resp, err := client.GetUser(ctx,
&pb.GetUserRequest{Id: 1},
grpc.Header(&header),
grpc.Trailer(&trailer),
)
if err != nil {
log.Fatal(err)
}
log.Printf("响应: %v", resp)
log.Printf("Header: %v", header)
log.Printf("Trailer: %v", trailer)
}
// 服务端读取/设置元数据
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
// 读取客户端元数据
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if traceIDs := md.Get("x-trace-id"); len(traceIDs) > 0 {
log.Printf("Trace ID: %s", traceIDs[0])
}
}
// 设置响应元数据
header := metadata.Pairs("x-server-region", "cn-east")
grpc.SetHeader(ctx, header)
trailer := metadata.Pairs("x-processing-time", "50ms")
grpc.SetTrailer(ctx, trailer)
// ...
return &pb.GetUserResponse{}, nil
}
📌 概念:微服务架构¶
5.1 架构概览¶
┌──────────────┐
│ API 网关 │
└──────┬───────┘
┌─────────────┼─────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 用户服务 │ │ 订单服务 │ │ 商品服务 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
┌────▼─────┐ ┌────▼─────┐ ┌────▼─────┐
│ MySQL │ │ MySQL │ │ Redis │
└──────────┘ └──────────┘ └──────────┘
┌────────────────────────────────────────┐
│ 服务注册与发现 (etcd/consul) │
└────────────────────────────────────────┘
┌────────────────────────────────────────┐
│ 配置中心 (nacos/apollo) │
└────────────────────────────────────────┘
5.2 服务注册与发现(etcd)¶
import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
)
// 服务注册
func registerService(etcdClient *clientv3.Client, serviceName, addr string) error {
em, err := endpoints.NewManager(etcdClient, serviceName)
if err != nil {
return err
}
// 创建租约(心跳保活)
lease, err := etcdClient.Grant(context.Background(), 10)
if err != nil {
return err
}
// 注册服务端点
key := fmt.Sprintf("%s/%s", serviceName, addr)
err = em.AddEndpoint(context.Background(), key,
endpoints.Endpoint{Addr: addr},
clientv3.WithLease(lease.ID),
)
if err != nil {
return err
}
// 保持租约活跃
ch, err := etcdClient.KeepAlive(context.Background(), lease.ID)
if err != nil {
return err
}
go func() {
for range ch {
// 保持心跳
}
log.Println("KeepAlive 通道关闭")
}()
return nil
}
// 服务发现 — 客户端负载均衡
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
etcdresolver "go.etcd.io/etcd/client/v3/naming/resolver"
)
func newClientWithDiscovery(etcdClient *clientv3.Client, serviceName string) (*grpc.ClientConn, error) {
// 创建 etcd 解析器
builder, err := etcdresolver.NewBuilder(etcdClient)
if err != nil {
return nil, err
}
conn, err := grpc.NewClient(
fmt.Sprintf("etcd:///%s", serviceName),
grpc.WithResolvers(builder),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
return conn, err
}
5.3 配置中心模式¶
// 配置结构
type Config struct {
Server ServerConfig `yaml:"server"`
Database DatabaseConfig `yaml:"database"`
Redis RedisConfig `yaml:"redis"`
}
type ServerConfig struct {
HTTP HTTPConfig `yaml:"http"`
GRPC GRPCConfig `yaml:"grpc"`
}
type HTTPConfig struct {
Addr string `yaml:"addr"`
Timeout time.Duration `yaml:"timeout"`
}
type GRPCConfig struct {
Addr string `yaml:"addr"`
Timeout time.Duration `yaml:"timeout"`
}
// 从 etcd 加载配置
func loadConfigFromEtcd(client *clientv3.Client, key string) (*Config, error) {
resp, err := client.Get(context.Background(), key)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return nil, fmt.Errorf("配置 %s 不存在", key)
}
var cfg Config
if err := yaml.Unmarshal(resp.Kvs[0].Value, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}
// 监听配置变更
func watchConfig(client *clientv3.Client, key string, onChange func(*Config)) {
wch := client.Watch(context.Background(), key)
for resp := range wch {
for _, ev := range resp.Events {
if ev.Type == clientv3.EventTypePut {
var cfg Config
if err := yaml.Unmarshal(ev.Kv.Value, &cfg); err != nil {
log.Printf("配置解析失败: %v", err)
continue
}
onChange(&cfg)
}
}
}
}
📌 概念:go-kratos 框架¶
6.1 项目初始化¶
# 安装 kratos CLI
go install github.com/go-kratos/kratos/cmd/kratos/v2@latest
# 创建项目
kratos new myapp
# 项目结构
myapp/
├── api/ # protobuf 定义
│ └── helloworld/
│ └── v1/
│ └── greeter.proto
├── cmd/ # 入口
│ └── myapp/
│ ├── main.go
│ └── wire.go # 依赖注入
├── configs/ # 配置文件
│ └── config.yaml
├── internal/ # 内部代码
│ ├── biz/ # 业务逻辑
│ ├── data/ # 数据访问
│ ├── server/ # 服务器(HTTP + gRPC)
│ └── service/ # 服务实现
├── third_party/ # 第三方 proto
├── Makefile
└── go.mod
6.2 Kratos 服务实现¶
// internal/service/user.go
package service
import (
"context"
pb "myapp/api/user/v1"
"myapp/internal/biz"
)
type UserService struct {
pb.UnimplementedUserServiceServer
uc *biz.UserUseCase
}
func NewUserService(uc *biz.UserUseCase) *UserService {
return &UserService{uc: uc}
}
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
user, err := s.uc.Get(ctx, req.Id)
if err != nil {
return nil, err
}
return &pb.GetUserResponse{
User: &pb.User{
Id: user.ID,
Name: user.Name,
},
}, nil
}
// internal/biz/user.go — 业务逻辑层(Clean Architecture 的 UseCase)
package biz
import "context"
type User struct {
ID uint64
Name string
Email string
}
// 仓库接口(由 data 层实现)
type UserRepo interface {
GetByID(ctx context.Context, id uint64) (*User, error)
Create(ctx context.Context, user *User) error
Update(ctx context.Context, user *User) error
Delete(ctx context.Context, id uint64) error
}
type UserUseCase struct {
repo UserRepo
log *log.Helper
}
func NewUserUseCase(repo UserRepo, logger log.Logger) *UserUseCase {
return &UserUseCase{
repo: repo,
log: log.NewHelper(logger),
}
}
func (uc *UserUseCase) Get(ctx context.Context, id uint64) (*User, error) {
user, err := uc.repo.GetByID(ctx, id)
if err != nil {
uc.log.WithContext(ctx).Errorf("查询用户失败: %v", err)
return nil, err
}
return user, nil
}
// internal/server/grpc.go — gRPC 服务器注册
package server
import (
"myapp/internal/service"
pb "myapp/api/user/v1"
"github.com/go-kratos/kratos/v2/transport/grpc"
)
func NewGRPCServer(c *conf.Server, userSvc *service.UserService) *grpc.Server {
opts := []grpc.ServerOption{
grpc.Address(c.Grpc.Addr),
grpc.Timeout(c.Grpc.Timeout.AsDuration()),
}
srv := grpc.NewServer(opts...)
pb.RegisterUserServiceServer(srv, userSvc)
return srv
}
// cmd/myapp/main.go — 入口
package main
import (
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
)
func newApp(gs *grpc.Server, hs *http.Server) *kratos.App {
return kratos.New(
kratos.Name("myapp"),
kratos.Server(gs, hs),
)
}
func main() {
// wire 注入
app, cleanup, err := wireApp(...)
if err != nil {
panic(err)
}
defer cleanup()
if err := app.Run(); err != nil {
panic(err)
}
}
💻 代码示例:完整微服务项目¶
连接池与健康检查¶
import (
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
)
func newGRPCServer() *grpc.Server {
srv := grpc.NewServer(
// Keep-alive 参数
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Minute,
MaxConnectionAge: 30 * time.Minute,
MaxConnectionAgeGrace: 5 * time.Second,
Time: 5 * time.Minute,
Timeout: 1 * time.Second,
}),
// 拦截器链
grpc.ChainUnaryInterceptor(
RecoveryInterceptor,
UnaryServerInterceptor,
),
// 最大消息大小
grpc.MaxRecvMsgSize(10 * 1024 * 1024), // 10MB
)
// 注册健康检查服务
healthSrv := health.NewServer()
grpc_health_v1.RegisterHealthServer(srv, healthSrv)
healthSrv.SetServingStatus("user.UserService", grpc_health_v1.HealthCheckResponse_SERVING)
return srv
}
// 客户端连接配置
func newClientConn(addr string) (*grpc.ClientConn, error) {
return grpc.NewClient(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
)
}
✅ 最佳实践¶
- proto 文件版本化: 使用
api/v1/路径管理 API 版本,避免破坏变更 - 使用 gRPC 标准错误码: 建立
codes.NotFound、codes.InvalidArgument等标准映射 - 拦截器分层: Recovery → 日志 → 认证 → 限流 → 业务
- Context 传递: 始终传递
ctx,用于超时、取消和链路追踪 - 优雅关停:
GracefulStop()替代Stop(),等待进行中的 RPC 完成 - 健康检查: 实现 gRPC Health Checking Protocol,配合 Kubernetes 就绪探针
- 幂等设计: 客户端重试时服务端必须保证幂等性
🎯 面试题¶
Q1: gRPC 和 REST 的主要区别是什么?¶
A: 1) 协议:gRPC 基于 HTTP/2,REST 通常基于 HTTP/1.1;2) 编码:gRPC 使用 protobuf(二进制),REST 使用 JSON(文本);3) 性能:gRPC 序列化/反序列化更快、传输更小;4) 流式:gRPC 原生支持四种流模式,REST 需额外方案;5) 代码生成:gRPC 强类型代码生成 vs REST 手写客户端;6) 浏览器支持:REST 更好,gRPC 需要 gRPC-Web 适配;7) 适用场景:gRPC 适合内部微服务通信,REST 适合公开 API。
Q2: gRPC 的四种通信模式分别适合什么场景?¶
A: 1) Unary(一元): 常规请求-响应,如 CRUD;2) Server Streaming(服务端流): 大量数据下行,如日志推送、报表下载;3) Client Streaming(客户端流): 大量数据上行,如文件上传、批量导入;4) Bidirectional Streaming(双向流): 实时交互,如聊天、游戏同步、实时监控。
Q3: 什么是 gRPC 拦截器?和 HTTP 中间件有什么区别?¶
A: gRPC 拦截器类似 HTTP 中间件,是在 RPC 调用前后执行的钩子函数。gRPC 分为 Unary Interceptor(一元)和 Stream Interceptor(流式)两种。区别在于:gRPC 拦截器分客户端/服务端两侧,可通过 grpc.ChainUnaryInterceptor 链式组合,处理的是 protobuf 消息而非 HTTP 请求。常见用途:认证、日志、限流、Recovery、链路追踪。
Q4: 微服务中服务发现有哪些模式?¶
A: 1) 客户端发现: 客户端查询注册中心获取服务列表并自行负载均衡(如 etcd + gRPC resolver);2) 服务端发现: 客户端请求负载均衡器,由它查询注册中心(如 Nginx, Kubernetes Service);3) DNS 发现: 基于 DNS SRV 记录;4) 主流方案:etcd(强一致性 Raft共识)、Consul(支持健康检查)、Nacos(阿里巴巴,Spring Cloud 生态常用)。
Q5: go-kratos 框架的分层架构有什么优势?¶
A: Kratos 遵循 Clean Architecture 分为四层:1) Server 层: HTTP/gRPC 传输层,模板化;2) Service 层: 接口转换,protobuf ↔ 业务对象;3) Biz 层: 核心业务逻辑,定义 Repository 接口(依赖倒置);4) Data 层: 数据访问实现。优势:业务逻辑不依赖框架/数据库,可独立测试,替换数据存储只需改 Data 层。使用 Wire 做编译期依赖注入,避免反射开销。
Q6: gRPC 的 status 包如何实现丰富的错误处理?¶
A: google.golang.org/grpc/status 提供标准错误码(17个,如 NotFound, InvalidArgument, Internal 等)。通过 status.New(code, msg) 创建错误,用 WithDetails() 附加结构化信息(如 errdetails.BadRequest 的字段验证错误、errdetails.RetryInfo 的重试建议等)。客户端用 status.FromError() 解析错误码和详情。这比 HTTP 状态码更精细,是微服务间错误传递的最佳实践。
📋 学习检查清单¶
- 能编写 protobuf 文件并生成 Go 代码
- 能实现 gRPC 的四种通信模式
- 理解并能编写服务端/客户端拦截器
- 掌握 gRPC 错误处理和元数据传递
- 了解微服务架构中的服务注册/发现
- 理解配置中心的工作原理
- 了解 go-kratos 框架的分层架构
- 能搭建完整的 gRPC 微服务项目