gRPC 集成
gRPC 集成
概览
gRPC(Google 远程过程调用)是一个高性能、开源的通用 RPC 框架,使用 Protocol Buffers 进行序列化,非常适合微服务通信,具有高效的二进制序列化、流控制能力和内置的代码生成功能。
gRPC 包括:
- HTTP/2:高性能传输协议
- Protocol Buffers:高效的二进制序列化格式
- 代码生成:自动生成客户端和服务器代码
- 流控制:支持单向、服务器流、客户端流、双向流
- 多语言:支持多种编程语言
- 强类型:使用 .proto 文件的类型安全契约
为什么这很重要
- 提升性能:gRPC 可以比 REST 提升 5-10 倍的性能
- 降低延迟:HTTP/2 多路复用降低延迟
- 减少带宽:Protocol Buffers 减少负载大小高达 70%
- 提高开发者生产力:代码生成减少样板代码
- 改善类型安全:强类型减少运行时错误
核心概念
1. Proto 文件定义
syntax = "proto3";
package user.v1;
import "google/protobuf/timestamp.proto";
service UserService {
// 单向 RPC
rpc GetUser(GetUserRequest) returns (User);
// 服务器流 RPC
rpc ListUsers(ListUsersRequest) returns (stream User);
// 客户端流 RPC
rpc CreateUserBatch(stream CreateUserRequest) returns (CreateUserBatchResponse);
// 双向流 RPC
rpc UserEvents(stream UserEventRequest) returns (stream UserEventResponse);
}
message GetUserRequest {
string id = 1;
}
message User {
string id = 1;
string name = 2;
string email = 3;
google.protobuf.Timestamp created_at = 4;
}
2. Node.js 服务器实现
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const PROTO_PATH = './proto/user_service.proto';
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
});
const userProto = grpc.loadPackageDefinition(packageDefinition).user.v1;
const userService = {
getUser: (call, callback) => {
const user = users.get(call.request.getId());
if (!user) {
const error = new Error('用户未找到');
error.code = grpc.status.NOT_FOUND;
return callback(error);
}
callback(null, user);
},
createUser: (call, callback) => {
const user = {
id: uuidv4(),
name: call.request.getName(),
email: call.request.getEmail(),
};
users.set(user.id, user);
callback(null, user);
},
};
const server = new grpc.Server();
server.addService(userProto.UserService.service, userService);
server.bindAsync(
'0.0.0.0:50051',
grpc.ServerCredentials.createInsecure(),
(error, port) => {
console.log(`服务器运行在端口 ${port}`);
}
);
3. Node.js 客户端实现
const client = new userProto.UserService(
'localhost:50051',
grpc.credentials.createInsecure()
);
// 单向调用
async function getUser(id) {
return new Promise((resolve, reject) => {
const request = new userProto.GetUserRequest();
request.setId(id);
client.getUser(request, (error, response) => {
if (error) return reject(error);
resolve(response.toObject());
});
});
}
// 服务器流
async function listUsers() {
return new Promise((resolve, reject) => {
const call = client.listUsers(new userProto.ListUsersRequest());
const users = [];
call.on('data', (user) => {
users.push(user.toObject());
});
call.on('end', () => {
resolve(users);
});
call.on('error', reject);
});
}
4. 认证拦截器
const authInterceptor = (options, nextCall) => {
return new grpc.ServerInterceptingCall(nextCall(options), {
start: (metadata, listener, next) => {
const token = metadata.get('authorization')?.[0]?.replace('Bearer ', '');
if (!token) {
return next(null, {
status: { code: grpc.status.UNAUTHENTICATED, details: '缺少令牌' },
});
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
options.user = decoded;
next(metadata, listener);
} catch (error) {
return next(null, {
status: { code: grpc.status.UNAUTHENTICATED, details: '无效令牌' },
});
}
},
});
};
const server = new grpc.Server({
interceptors: [authInterceptor],
});
5. TLS 配置
const serverCredentials = grpc.ServerCredentials.createSsl(
fs.readFileSync('ca.crt'),
[
{
cert_chain: fs.readFileSync('server.crt'),
private_key: fs.readFileSync('server.key'),
},
],
false
);
server.bindAsync(
'0.0.0.0:50051',
serverCredentials,
(error, port) => {
console.log(`安全服务器运行在端口 ${port}`);
}
);
6. 截止时间配置
// 客户端截止时间
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 5);
client.getUser(request, { deadline }, (error, response) => {
if (error && error.code === grpc.status.DEADLINE_EXCEEDED) {
console.error('请求超时');
}
});
7. DNS 负载均衡
const client = new userProto.UserService(
'dns:///user-service:50051', // 基于 DNS 的服务发现
grpc.credentials.createInsecure(),
{
'grpc.load_balancing_config': [
{ round_robin: {} },
],
}
);
快速开始
-
安装 gRPC:
npm install @grpc/grpc-js @grpc/proto-loader -
定义 proto 文件:
syntax = "proto3"; package user.v1; service UserService { rpc GetUser(GetUserRequest) returns (User); } -
生成代码(如果需要):
protoc --js_out=import_style=commonjs,binary:. user.proto -
创建服务器:
const server = new grpc.Server(); server.addService(userProto.UserService.service, userService); server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure()); -
创建客户端:
const client = new userProto.UserService('localhost:50051', grpc.credentials.createInsecure());
生产清单
- [ ] 使用语义化版本控制 proto 包
- [ ] 将 proto 文件存放在单独的仓库中
- [ ] 为所有服务和消息编制文档
- [ ] 使用适当的 gRPC 状态码
- [ ] 为所有 RPC 实施截止时间
- [ ] 在生产中使用 TLS
- [ ] 实施认证和授权
- [ ] 添加拦截器用于日志和度量
- [ ] 使用连接池
- [ ] 实施健康检查
- [ ] 设置分布式追踪
- [ ] 对于大数据集使用流控制
- [ ] 实施优雅关闭
- [ ] 添加输入验证
- [ ] 监控性能指标
反模式
- 忽略截止时间:为所有 RPC 设置截止时间
- 阻塞调用:使用异步模式避免阻塞
- 无错误处理:适当处理所有 gRPC 状态码
- 跳过 TLS:在生产中始终使用 TLS
- 大消息:保持消息小并使用流控制
- 无可观测性:使用度量和追踪监控所有 RPC
- 版本控制问题:对 proto 包使用语义化版本控制
集成点
- Express REST:
03-backend-api/express-rest - Node.js API:
03-backend-api/nodejs-api - WebSocket 模式:
03-backend-api/websocket-patterns - 服务设计:
09-microservices/service-design - 服务发现:
09-microservices/service-discovery - 连接池:
04-database/connection-pooling