gRPC服务开发 grpc-service-development

构建高性能gRPC服务,支持单向调用、客户端流、服务器流和双向流模式,适用于微服务架构和多语言服务交互。

后端开发 0 次安装 0 次浏览 更新于 3/4/2026

gRPC服务开发

概述

使用Protocol Buffers开发高效的gRPC服务,支持单向调用、客户端流、服务器流和双向流模式。

何时使用

  • 构建需要高性能的微服务
  • 使用Protocol Buffers定义服务契约
  • 实施实时双向通信
  • 创建内部服务到服务的API
  • 在带宽受限的环境中进行优化
  • 构建多语言服务架构

指令

1. Protocol Buffer服务定义

syntax = "proto3";

package user.service;

message User {
  string id = 1;
  string email = 2;
  string first_name = 3;
  string last_name = 4;
  string role = 5;
  int64 created_at = 6;
  int64 updated_at = 7;
}

message CreateUserRequest {
  string email = 1;
  string first_name = 2;
  string last_name = 3;
  string role = 4;
}

message UpdateUserRequest {
  string id = 1;
  string email = 2;
  string first_name = 3;
  string last_name = 4;
}

message GetUserRequest {
  string id = 1;
}

message ListUsersRequest {
  int32 page = 1;
  int32 limit = 2;
}

message ListUsersResponse {
  repeated User users = 1;
  int32 total = 2;
  int32 page = 3;
}

message DeleteUserRequest {
  string id = 1;
}

message Empty {}

service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc UpdateUser(UpdateUserRequest) returns (User);
  rpc DeleteUser(DeleteUserRequest) returns (Empty);
  rpc StreamUsers(Empty) returns (stream User);
  rpc BulkCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
}

message Event {
  string type = 1;
  string user_id = 2;
  string data = 3;
  int64 timestamp = 4;
}

service EventService {
  rpc Subscribe(Empty) returns (stream Event);
  rpc PublishEvent(Event) returns (Empty);
}

2. Node.js gRPC服务器实现

const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');

const packageDef = protoLoader.loadSync(
  path.join(__dirname, 'user.proto'),
  { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }
);

const userProto = grpc.loadPackageDefinition(packageDef).user.service;

const users = new Map();
let userIdCounter = 1;

const userServiceImpl = {
  getUser: (call, callback) => {
    const user = users.get(call.request.id);
    if (!user) {
      return callback({ code: grpc.status.NOT_FOUND, details: '用户未找到' });
    }
    callback(null, user);
  },

  listUsers: (call, callback) => {
    const page = call.request.page || 1;
    const limit = call.request.limit || 20;
    const offset = (page - 1) * limit;

    const userArray = Array.from(users.values());
    const paginatedUsers = userArray.slice(offset, offset + limit);

    callback(null, {
      users: paginatedUsers,
      total: userArray.length,
      page: page
    });
  },

  createUser: (call, callback) => {
    const id = String(userIdCounter++);
    const user = {
      id,
      email: call.request.email,
      first_name: call.request.first_name,
      last_name: call.request.last_name,
      role: call.request.role,
      created_at: Date.now(),
      updated_at: Date.now()
    };
    users.set(id, user);
    callback(null, user);
  },

  updateUser: (call, callback) => {
    const user = users.get(call.request.id);
    if (!user) {
      return callback({ code: grpc.status.NOT_FOUND, details: '用户未找到' });
    }

    Object.assign(user, {
      email: call.request.email || user.email,
      first_name: call.request.first_name || user.first_name,
      last_name: call.request.last_name || user.last_name,
      updated_at: Date.now()
    });

    callback(null, user);
  },

  deleteUser: (call, callback) => {
    users.delete(call.request.id);
    callback(null, {});
  },

  streamUsers: (call) => {
    Array.from(users.values()).forEach(user => {
      call.write(user);
    });
    call.end();
  },

  bulkCreateUsers: (call, callback) => {
    const createdUsers = [];

    call.on('data', (request) => {
      const id = String(userIdCounter++);
      const user = {
        id,
        email: request.email,
        first_name: request.first_name,
        last_name: request.last_name,
        role: request.role,
        created_at: Date.now(),
        updated_at: Date.now()
      };
      users.set(id, user);
      createdUsers.push(user);
    });

    call.on('end', () => {
      callback(null, { users: createdUsers, total: createdUsers.length, page: 1 });
    });

    call.on('error', (err) => {
      callback(err);
    });
  }
};

const server = new grpc.Server();
server.addService(userProto.UserService.service, userServiceImpl);

server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
  console.log('gRPC服务器在50051端口运行');
  server.start();
});

3. Python gRPC服务器(grpcio)

import grpc
from concurrent import futures
import user_pb2
import user_pb2_grpc
from datetime import datetime

class UserServicer(user_pb2_grpc.UserServiceServicer):
    def __init__(self):
        self.users = {}
        self.user_counter = 1

    def GetUser(self, request, context):
        if request.id not in self.users:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details('用户未找到')
            return user_pb2.User()
        return self.users[request.id]

    def ListUsers(self, request, context):
        users_list = list(self.users.values())
        page = request.page or 1
        limit = request.limit or 20
        offset = (page - 1) * limit

        return user_pb2.ListUsersResponse(
            users=users_list[offset:offset + limit],
            total=len(users_list),
            page=page
        )

    def CreateUser(self, request, context):
        user_id = str(self.user_counter)
        self.user_counter += 1

        user = user_pb2.User(
            id=user_id,
            email=request.email,
            first_name=request.first_name,
            last_name=request.last_name,
            role=request.role,
            created_at=int(datetime.now().timestamp()),
            updated_at=int(datetime.now().timestamp())
        )
        self.users[user_id] = user
        return user

    def StreamUsers(self, request, context):
        for user in self.users.values():
            yield user

    def BulkCreateUsers(self, request_iterator, context):
        created_users = []
        for request in request_iterator:
            user = self.CreateUser(request, context)
            created_users.append(user)

        return user_pb2.ListUsersResponse(
            users=created_users,
            total=len(created_users),
            page=1
        )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    print('gRPC服务器在50051端口运行')
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

4. 客户端实现

const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');

const packageDef = protoLoader.loadSync(
  path.join(__dirname, 'user.proto')
);

const userProto = grpc.loadPackageDefinition(packageDef).user.service;
const client = new userProto.UserService('localhost:50051', grpc.credentials.createInsecure());

// 单向调用
client.getUser({ id: '123' }, (err, user) => {
  if (err) console.error(err);
  console.log('用户:', user);
});

// 服务器流
const stream = client.streamUsers({});
stream.on('data', (user) => {
  console.log('接收到用户:', user);
});
stream.on('end', () => {
  console.log('流结束');
});

// 客户端流
const writeStream = client.bulkCreateUsers((err, response) => {
  if (err) console.error(err);
  console.log('创建用户:', response.users.length);
});

writeStream.write({ email: 'user1@example.com', first_name: 'John', last_name: 'Doe' });
writeStream.write({ email: 'user2@example.com', first_name: 'Jane', last_name: 'Smith' });
writeStream.end();

最佳实践

✅ 做

  • 使用清晰的消息和服务命名
  • 实现适当的错误处理与gRPC状态码
  • 添加元数据用于日志记录和跟踪
  • 版本控制你的protobuf定义
  • 对于大数据集使用流
  • 实施超时和截止日期
  • 监控gRPC指标

❌ 避免

  • 使用gRPC进行基于浏览器的客户端(使用gRPC-Web)
  • 在proto定义中暴露敏感数据
  • 创建深度嵌套的消息
  • 忽略错误状态码
  • 发送未压缩的大负载
  • 在生产中跳过TLS安全

部署

# 生成protobuf代码
protoc --go_out=. --go-grpc_out=. *.proto

# 编译Node.js gRPC服务器
npm install @grpc/grpc-js @grpc/proto-loader

# 编译Python gRPC服务器
pip install grpcio grpcio-tools
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user.proto