JSON-RPC 协议技能
name: json-rpc-expert
risk_level: 中等
description: JSON-RPC 2.0 协议实现专家,包括消息分发、错误处理、批量处理和安全 RPC 端点
version: 1.0.0
author: JARVIS AI 助手
tags: [protocol, json-rpc, api, rpc, messaging]
1. 概述
风险等级: 中等风险
理由: JSON-RPC 端点处理远程过程调用,可以执行服务器端代码,容易受到注入攻击、拒绝服务攻击和错误处理不当导致信息泄露的漏洞。
您是 JSON-RPC 2.0 协议实现的专家。您构建安全、符合标准的 RPC 服务器和客户端,具备正确的消息分发、错误处理和批量处理功能。
核心专长
- JSON-RPC 2.0 规范合规性
- 方法分发和路由
- 错误代码标准化
- 批量请求处理
- 传输层集成
主要应用场景
- 为微服务构建 JSON-RPC 服务器
- 实现 RPC 客户端
- 批量操作优化
- 错误处理标准化
文件组织: 主要概念在此处;有关 CVE 缓解措施,请参见 references/security-examples.md。
2. 核心原则
- 测试驱动开发优先: 在部署前编写测试,验证 RPC 方法、错误处理和批量处理是否正确工作
- 性能意识: 通过连接池、批量请求和响应缓存优化吞吐量
- 设计安全: 白名单方法、验证输入、清理错误
- 规范合规: 严格遵循 JSON-RPC 2.0
3. 核心职责
基本职责
- 规范合规: 正确实现 JSON-RPC 2.0
- 安全方法分发: 在执行前验证方法
- 正确错误处理: 使用标准错误代码,隐藏内部信息
- 批量处理: 安全高效地处理批量请求
安全原则
- 方法白名单: 仅暴露注册的方法
- 输入验证: 验证所有参数
- 速率限制: 防止滥用
- 错误清理: 绝不暴露堆栈跟踪
4. 技术基础
JSON-RPC 2.0 消息格式
// 请求
interface JSONRPCRequest {
jsonrpc: "2.0";
method: string;
params?: unknown[] | Record<string, unknown>;
id?: string | number | null;
}
// 响应
interface JSONRPCResponse {
jsonrpc: "2.0";
result?: unknown;
error?: JSONRPCError;
id: string | number | null;
}
// 错误
interface JSONRPCError {
code: number;
message: string;
data?: unknown;
}
标准错误代码
| 代码 | 消息 | 含义 |
|---|---|---|
| -32700 | 解析错误 | 无效 JSON |
| -32600 | 无效请求 | 非有效 JSON-RPC |
| -32601 | 方法未找到 | 方法不存在 |
| -32602 | 无效参数 | 无效方法参数 |
| -32603 | 内部错误 | 内部 JSON-RPC 错误 |
| -32000 到 -32099 | 服务器错误 | 实现定义 |
5. 实现工作流程(测试驱动开发)
步骤 1: 先编写失败的测试
# tests/test_rpc_methods.py
import pytest
from jsonrpc_server import JSONRPCServer
class TestRPCMethods:
@pytest.fixture
def server(self):
return JSONRPCServer()
def test_method_not_found(self, server):
response = server.handle_request({"jsonrpc": "2.0", "method": "nonexistent", "id": 1})
assert response["error"]["code"] == -32601
def test_invalid_params(self, server):
server.register_method("transfer", transfer_handler, TransferSchema)
response = server.handle_request({"jsonrpc": "2.0", "method": "transfer", "params": {"amount": "bad"}, "id": 1})
assert response["error"]["code"] == -32602
def test_batch_request_limit(self, server):
requests = [{"jsonrpc": "2.0", "method": "ping", "id": i} for i in range(200)]
response = server.handle_request(requests)
assert response[0]["error"]["code"] == -32600
def test_successful_method_call(self, server):
server.register_method("add", lambda p: p["a"] + p["b"], AddSchema)
response = server.handle_request({"jsonrpc": "2.0", "method": "add", "params": {"a": 2, "b": 3}, "id": 1})
assert response["result"] == 5
步骤 2: 实现最小功能以通过测试
# jsonrpc_server.py
class JSONRPCServer:
def __init__(self):
self.methods = {}
self.max_batch_size = 100
def register_method(self, name, handler, schema):
self.methods[name] = {"handler": handler, "schema": schema}
def handle_request(self, request):
if isinstance(request, list):
return self._handle_batch(request)
return self._handle_single(request)
def _handle_single(self, request):
method = request.get("method")
if method not in self.methods:
return self._error(request.get("id"), -32601, "Method not found")
# ... 实现验证和执行
步骤 3: 使用完整模式重构
应用以下部分的安全模式、错误处理和性能优化。
步骤 4: 运行完整验证
pytest tests/test_rpc_methods.py -v # 运行所有测试
pytest --cov=jsonrpc_server --cov-report=term-missing # 覆盖率
pytest tests/test_rpc_security.py -v # 安全测试
pytest tests/test_rpc_performance.py --benchmark-only # 基准测试
6. 实现模式
6.1 安全 JSON-RPC 服务器
import { z } from "zod";
class JSONRPCServer {
private methods: Map<string, MethodHandler> = new Map();
registerMethod<T>(name: string, schema: z.ZodSchema<T>, handler: (params: T) => Promise<unknown>): void {
if (!/^[a-zA-Z][a-zA-Z0-9_.]*$/.test(name)) throw new Error("Invalid method name");
this.methods.set(name, { schema, handler });
}
async handleRequest(request: unknown): Promise<JSONRPCResponse | JSONRPCResponse[]> {
let parsed: unknown;
try {
parsed = typeof request === "string" ? JSON.parse(request) : request;
} catch { return this.createError(null, -32700, "Parse error"); }
if (Array.isArray(parsed)) {
if (parsed.length === 0) return this.createError(null, -32600, "Invalid Request");
return Promise.all(parsed.map(req => this.handleSingleRequest(req)));
}
return this.handleSingleRequest(parsed);
}
private async handleSingleRequest(request: unknown): Promise<JSONRPCResponse> {
if (!this.validateRequest(request)) return this.createError(null, -32600, "Invalid Request");
const { method, params, id } = request as JSONRPCRequest;
const handler = this.methods.get(method);
if (!handler) return this.createError(id, -32601, "Method not found");
const paramValidation = handler.schema.safeParse(params);
if (!paramValidation.success) return this.createError(id, -32602, "Invalid params");
try {
const result = await handler.handler(paramValidation.data);
if (id === undefined) return null as unknown as JSONRPCResponse;
return { jsonrpc: "2.0", result, id };
} catch (error) {
console.error("Method execution error:", error);
return this.createError(id, -32603, "Internal error");
}
}
private createError(id: string | number | null, code: number, message: string): JSONRPCResponse {
return { jsonrpc: "2.0", error: { code, message }, id };
}
private validateRequest(request: unknown): boolean {
if (typeof request !== "object" || request === null) return false;
const req = request as Record<string, unknown>;
return req.jsonrpc === "2.0" && typeof req.method === "string";
}
}
6.2 带授权的方法注册
const server = new JSONRPCServer();
// 公共方法
server.registerMethod("getStatus", z.object({}), async () => ({ status: "healthy" }));
// 认证方法
server.registerMethod("getUserData", z.object({
userId: z.string().uuid(),
authToken: z.string().min(1)
}), async (params) => {
const user = await verifyAuthToken(params.authToken);
if (!user) throw new Error("Unauthorized");
if (user.id !== params.userId && !user.isAdmin) throw new Error("Forbidden");
return await getUserData(params.userId);
});
// 仅管理员方法
server.registerMethod("admin.deleteUser", z.object({
userId: z.string().uuid(),
authToken: z.string().min(1)
}), async (params) => {
const user = await verifyAuthToken(params.authToken);
if (!user?.isAdmin) throw new Error("Admin access required");
return await deleteUser(params.userId);
});
6.3 带限制的批量处理
// 安全批量处理
async handleBatchRequest(requests: JSONRPCRequest[]): Promise<JSONRPCResponse[]> {
// 限制批量大小
const MAX_BATCH_SIZE = 100;
if (requests.length > MAX_BATCH_SIZE) {
return [this.createError(null, -32600, `Batch size exceeds limit of ${MAX_BATCH_SIZE}`)];
}
// 使用并发限制处理
const CONCURRENCY_LIMIT = 10;
const results: JSONRPCResponse[] = [];
for (let i = 0; i < requests.length; i += CONCURRENCY_LIMIT) {
const batch = requests.slice(i, i + CONCURRENCY_LIMIT);
const batchResults = await Promise.all(
batch.map(req => this.handleSingleRequest(req))
);
results.push(...batchResults.filter(r => r !== null));
}
return results;
}
6.4 HTTP 传输集成
import express from "express";
import helmet from "helmet";
import rateLimit from "express-rate-limit";
const app = express();
app.use(helmet());
app.use(express.json({ limit: "1mb" }));
app.use("/rpc", rateLimit({
windowMs: 60000, max: 100,
message: { jsonrpc: "2.0", error: { code: -32000, message: "Rate limit exceeded" }, id: null }
}));
app.post("/rpc", async (req, res) => {
if (req.headers["content-type"] !== "application/json") {
return res.status(415).json({ jsonrpc: "2.0", error: { code: -32700, message: "Invalid content-type" }, id: null });
}
const response = await server.handleRequest(req.body);
if (!response || (Array.isArray(response) && !response.length)) return res.status(204).end();
res.json(response);
});
7. 性能模式
7.1 批量请求
// 不好: 多个单独请求
for (const item of items) { await client.call("process", { item }); }
// 好: 单个批量请求
const batch = items.map((item, i) => ({ jsonrpc: "2.0", method: "process", params: { item }, id: i }));
const results = await client.batch(batch);
7.2 连接池
// 不好: 每次请求新连接
const client = new RPCClient(url); // 每次调用创建新连接
// 好: 从池中复用连接
const pool = new RPCClientPool(url, { maxConnections: 10 });
const client = await pool.acquire();
try { return await client.call(method, params); } finally { pool.release(client); }
7.3 响应缓存
// 不好: 每次数据库查询
server.registerMethod("getConfig", schema, async () => await db.query("SELECT * FROM config"));
// 好: 带 TTL 的 LRU 缓存
const cache = new LRUCache({ max: 1000, ttl: 60000 });
server.registerMethod("getConfig", schema, async (params) => {
const key = `config:${params.section}`;
return cache.get(key) || cache.set(key, await db.query("SELECT * FROM config WHERE section = ?", [params.section]));
});
7.4 流式处理大结果
// 不好: 加载整个数据集(内存溢出风险)
server.registerMethod("exportData", schema, async () => await db.query("SELECT * FROM huge_table"));
// 好: 分页结果
server.registerMethod("exportData", schema, async ({ cursor = 0, limit = 100 }) => {
const data = await db.query("SELECT * FROM huge_table WHERE id > ? LIMIT ?", [cursor, limit]);
return { data, nextCursor: data.length === limit ? data[data.length - 1].id : null };
});
7.5 负载优化
// 不好: 返回所有字段(50KB)
server.registerMethod("getUser", schema, async ({ id }) => await getUser(id));
// 好: 仅返回请求字段(500B)
server.registerMethod("getUser", schema, async ({ id, fields }) => {
const user = await getUser(id);
return fields ? Object.fromEntries(fields.map(f => [f, user[f]])) : user;
});
8. 安全标准
8.1 领域漏洞概况
完整 CVE 详情请参见
references/security-examples.md。
主要漏洞:
- 方法注入: 访问未注册/内部方法
- 参数注入: 恶意参数导致代码执行
- 批量拒绝服务: 大量批量请求消耗资源
- 错误信息泄露: 错误中包含堆栈跟踪
8.2 输入验证
// 使用 Zod 进行完整参数验证
const TransferSchema = z.object({
from: z.string().uuid(),
to: z.string().uuid(),
amount: z.number().positive().max(1000000),
currency: z.enum(["USD", "EUR", "GBP"]),
memo: z.string().max(200).optional()
}).refine(data => data.from !== data.to, "Cannot transfer to same account");
server.registerMethod("transfer", TransferSchema, async (params) => executeTransfer(params));
8.3 错误处理
// 安全错误响应 - 内部记录详细信息,返回通用消息
class SafeJSONRPCError extends Error {
constructor(public code: number, message: string, private internal?: string) { super(message); }
toResponse(id: string | number | null): JSONRPCResponse {
if (this.internal) console.error(`RPC Error [${this.code}]: ${this.internal}`);
return { jsonrpc: "2.0", error: { code: this.code, message: this.message }, id };
}
}
// 用法: 内部信息记录但不返回给客户端
throw new SafeJSONRPCError(-32603, "Internal error", `DB failed: ${dbError.message}`);
9. 常见错误
绝不要: 执行动态方法
// 不好: 从用户输入中任意访问方法
const fn = this[request.method]; return fn(request.params);
// 好: 仅白名单注册方法
const handler = this.registeredMethods.get(request.method);
if (!handler) throw new Error("Method not found");
return handler(request.params);
绝不要: 返回内部错误
// 不好: 暴露堆栈跟踪
catch (error) { return { error: { code: -32603, message: error.stack } }; }
// 好: 内部记录,返回通用消息
catch (error) { console.error(error); return { error: { code: -32603, message: "Internal error" } }; }
10. 预实现检查清单
阶段 1: 编写代码前
- [ ] 为 RPC 方法和错误处理编写失败的测试
- [ ] 定义所有方法的参数模式
- [ ] 记录方法白名单
- [ ] 为受保护方法计划认证策略
阶段 2: 实现过程中
- [ ] 所有方法使用显式白名单注册
- [ ] 使用模式验证参数(Zod/Pydantic)
- [ ] 强制执行批量大小限制(最大 100)
- [ ] 配置每个端点的速率限制
- [ ] 清理错误消息(无堆栈跟踪)
- [ ] 设置请求大小限制(最大 1MB)
- [ ] 方法执行超时
阶段 3: 提交前
- [ ] 所有测试通过:
pytest tests/test_rpc_*.py -v - [ ] 安全测试通过:
pytest tests/test_rpc_security.py -v - [ ] 性能基准可接受
- [ ] 为所有方法调用启用审计日志
- [ ] 更新新方法的文档
11. 总结
您的目标是实现 JSON-RPC 服务,使其:
- 合规: 严格遵循 JSON-RPC 2.0 规范
- 安全: 验证所有输入,白名单方法,清理错误
- 健壮: 安全处理批量请求,强制执行限制,操作超时
记住: 每个 RPC 方法都是一个潜在的攻击向量。验证参数,授权访问,绝不暴露错误响应中的内部信息。