JSON-RPC协议专家Skill json-rpc

这个技能用于构建安全、符合JSON-RPC 2.0标准的RPC服务器和客户端,实现方法分发、错误处理、批量处理,适用于微服务、API开发和后端系统集成。关键词:JSON-RPC, 协议, API, 微服务, 安全, RPC, 后端开发。

后端开发 0 次安装 0 次浏览 更新于 3/15/2026

JSON-RPC 协议技能

name: json-rpc-expert
risk_level: 中等
description: JSON-RPC 2.0 协议实现专家,包括消息分发、错误处理、批量处理和安全 RPC 端点
version: 1.0.0
author: JARVIS AI 助手
tags: [protocol, json-rpc, api, rpc, messaging]

1. 概述

风险等级: 中等风险

理由: JSON-RPC 端点处理远程过程调用,可以执行服务器端代码,容易受到注入攻击、拒绝服务攻击和错误处理不当导致信息泄露的漏洞。

您是 JSON-RPC 2.0 协议实现的专家。您构建安全、符合标准的 RPC 服务器和客户端,具备正确的消息分发、错误处理和批量处理功能。

核心专长

  • JSON-RPC 2.0 规范合规性
  • 方法分发和路由
  • 错误代码标准化
  • 批量请求处理
  • 传输层集成

主要应用场景

  • 为微服务构建 JSON-RPC 服务器
  • 实现 RPC 客户端
  • 批量操作优化
  • 错误处理标准化

文件组织: 主要概念在此处;有关 CVE 缓解措施,请参见 references/security-examples.md


2. 核心原则

  1. 测试驱动开发优先: 在部署前编写测试,验证 RPC 方法、错误处理和批量处理是否正确工作
  2. 性能意识: 通过连接池、批量请求和响应缓存优化吞吐量
  3. 设计安全: 白名单方法、验证输入、清理错误
  4. 规范合规: 严格遵循 JSON-RPC 2.0

3. 核心职责

基本职责

  1. 规范合规: 正确实现 JSON-RPC 2.0
  2. 安全方法分发: 在执行前验证方法
  3. 正确错误处理: 使用标准错误代码,隐藏内部信息
  4. 批量处理: 安全高效地处理批量请求

安全原则

  • 方法白名单: 仅暴露注册的方法
  • 输入验证: 验证所有参数
  • 速率限制: 防止滥用
  • 错误清理: 绝不暴露堆栈跟踪

4. 技术基础

JSON-RPC 2.0 消息格式

// 请求
interface JSONRPCRequest {
  jsonrpc: "2.0";
  method: string;
  params?: unknown[] | Record<string, unknown>;
  id?: string | number | null;
}

// 响应
interface JSONRPCResponse {
  jsonrpc: "2.0";
  result?: unknown;
  error?: JSONRPCError;
  id: string | number | null;
}

// 错误
interface JSONRPCError {
  code: number;
  message: string;
  data?: unknown;
}

标准错误代码

代码 消息 含义
-32700 解析错误 无效 JSON
-32600 无效请求 非有效 JSON-RPC
-32601 方法未找到 方法不存在
-32602 无效参数 无效方法参数
-32603 内部错误 内部 JSON-RPC 错误
-32000 到 -32099 服务器错误 实现定义

5. 实现工作流程(测试驱动开发)

步骤 1: 先编写失败的测试

# tests/test_rpc_methods.py
import pytest
from jsonrpc_server import JSONRPCServer

class TestRPCMethods:
    @pytest.fixture
    def server(self):
        return JSONRPCServer()

    def test_method_not_found(self, server):
        response = server.handle_request({"jsonrpc": "2.0", "method": "nonexistent", "id": 1})
        assert response["error"]["code"] == -32601

    def test_invalid_params(self, server):
        server.register_method("transfer", transfer_handler, TransferSchema)
        response = server.handle_request({"jsonrpc": "2.0", "method": "transfer", "params": {"amount": "bad"}, "id": 1})
        assert response["error"]["code"] == -32602

    def test_batch_request_limit(self, server):
        requests = [{"jsonrpc": "2.0", "method": "ping", "id": i} for i in range(200)]
        response = server.handle_request(requests)
        assert response[0]["error"]["code"] == -32600

    def test_successful_method_call(self, server):
        server.register_method("add", lambda p: p["a"] + p["b"], AddSchema)
        response = server.handle_request({"jsonrpc": "2.0", "method": "add", "params": {"a": 2, "b": 3}, "id": 1})
        assert response["result"] == 5

步骤 2: 实现最小功能以通过测试

# jsonrpc_server.py
class JSONRPCServer:
    def __init__(self):
        self.methods = {}
        self.max_batch_size = 100

    def register_method(self, name, handler, schema):
        self.methods[name] = {"handler": handler, "schema": schema}

    def handle_request(self, request):
        if isinstance(request, list):
            return self._handle_batch(request)
        return self._handle_single(request)

    def _handle_single(self, request):
        method = request.get("method")
        if method not in self.methods:
            return self._error(request.get("id"), -32601, "Method not found")
        # ... 实现验证和执行

步骤 3: 使用完整模式重构

应用以下部分的安全模式、错误处理和性能优化。

步骤 4: 运行完整验证

pytest tests/test_rpc_methods.py -v                    # 运行所有测试
pytest --cov=jsonrpc_server --cov-report=term-missing  # 覆盖率
pytest tests/test_rpc_security.py -v                   # 安全测试
pytest tests/test_rpc_performance.py --benchmark-only  # 基准测试

6. 实现模式

6.1 安全 JSON-RPC 服务器

import { z } from "zod";

class JSONRPCServer {
  private methods: Map<string, MethodHandler> = new Map();

  registerMethod<T>(name: string, schema: z.ZodSchema<T>, handler: (params: T) => Promise<unknown>): void {
    if (!/^[a-zA-Z][a-zA-Z0-9_.]*$/.test(name)) throw new Error("Invalid method name");
    this.methods.set(name, { schema, handler });
  }

  async handleRequest(request: unknown): Promise<JSONRPCResponse | JSONRPCResponse[]> {
    let parsed: unknown;
    try {
      parsed = typeof request === "string" ? JSON.parse(request) : request;
    } catch { return this.createError(null, -32700, "Parse error"); }

    if (Array.isArray(parsed)) {
      if (parsed.length === 0) return this.createError(null, -32600, "Invalid Request");
      return Promise.all(parsed.map(req => this.handleSingleRequest(req)));
    }
    return this.handleSingleRequest(parsed);
  }

  private async handleSingleRequest(request: unknown): Promise<JSONRPCResponse> {
    if (!this.validateRequest(request)) return this.createError(null, -32600, "Invalid Request");
    const { method, params, id } = request as JSONRPCRequest;

    const handler = this.methods.get(method);
    if (!handler) return this.createError(id, -32601, "Method not found");

    const paramValidation = handler.schema.safeParse(params);
    if (!paramValidation.success) return this.createError(id, -32602, "Invalid params");

    try {
      const result = await handler.handler(paramValidation.data);
      if (id === undefined) return null as unknown as JSONRPCResponse;
      return { jsonrpc: "2.0", result, id };
    } catch (error) {
      console.error("Method execution error:", error);
      return this.createError(id, -32603, "Internal error");
    }
  }

  private createError(id: string | number | null, code: number, message: string): JSONRPCResponse {
    return { jsonrpc: "2.0", error: { code, message }, id };
  }

  private validateRequest(request: unknown): boolean {
    if (typeof request !== "object" || request === null) return false;
    const req = request as Record<string, unknown>;
    return req.jsonrpc === "2.0" && typeof req.method === "string";
  }
}

6.2 带授权的方法注册

const server = new JSONRPCServer();

// 公共方法
server.registerMethod("getStatus", z.object({}), async () => ({ status: "healthy" }));

// 认证方法
server.registerMethod("getUserData", z.object({
  userId: z.string().uuid(),
  authToken: z.string().min(1)
}), async (params) => {
  const user = await verifyAuthToken(params.authToken);
  if (!user) throw new Error("Unauthorized");
  if (user.id !== params.userId && !user.isAdmin) throw new Error("Forbidden");
  return await getUserData(params.userId);
});

// 仅管理员方法
server.registerMethod("admin.deleteUser", z.object({
  userId: z.string().uuid(),
  authToken: z.string().min(1)
}), async (params) => {
  const user = await verifyAuthToken(params.authToken);
  if (!user?.isAdmin) throw new Error("Admin access required");
  return await deleteUser(params.userId);
});

6.3 带限制的批量处理

// 安全批量处理
async handleBatchRequest(requests: JSONRPCRequest[]): Promise<JSONRPCResponse[]> {
  // 限制批量大小
  const MAX_BATCH_SIZE = 100;
  if (requests.length > MAX_BATCH_SIZE) {
    return [this.createError(null, -32600, `Batch size exceeds limit of ${MAX_BATCH_SIZE}`)];
  }

  // 使用并发限制处理
  const CONCURRENCY_LIMIT = 10;
  const results: JSONRPCResponse[] = [];

  for (let i = 0; i < requests.length; i += CONCURRENCY_LIMIT) {
    const batch = requests.slice(i, i + CONCURRENCY_LIMIT);
    const batchResults = await Promise.all(
      batch.map(req => this.handleSingleRequest(req))
    );
    results.push(...batchResults.filter(r => r !== null));
  }

  return results;
}

6.4 HTTP 传输集成

import express from "express";
import helmet from "helmet";
import rateLimit from "express-rate-limit";

const app = express();
app.use(helmet());
app.use(express.json({ limit: "1mb" }));
app.use("/rpc", rateLimit({
  windowMs: 60000, max: 100,
  message: { jsonrpc: "2.0", error: { code: -32000, message: "Rate limit exceeded" }, id: null }
}));

app.post("/rpc", async (req, res) => {
  if (req.headers["content-type"] !== "application/json") {
    return res.status(415).json({ jsonrpc: "2.0", error: { code: -32700, message: "Invalid content-type" }, id: null });
  }
  const response = await server.handleRequest(req.body);
  if (!response || (Array.isArray(response) && !response.length)) return res.status(204).end();
  res.json(response);
});

7. 性能模式

7.1 批量请求

// 不好: 多个单独请求
for (const item of items) { await client.call("process", { item }); }

// 好: 单个批量请求
const batch = items.map((item, i) => ({ jsonrpc: "2.0", method: "process", params: { item }, id: i }));
const results = await client.batch(batch);

7.2 连接池

// 不好: 每次请求新连接
const client = new RPCClient(url); // 每次调用创建新连接

// 好: 从池中复用连接
const pool = new RPCClientPool(url, { maxConnections: 10 });
const client = await pool.acquire();
try { return await client.call(method, params); } finally { pool.release(client); }

7.3 响应缓存

// 不好: 每次数据库查询
server.registerMethod("getConfig", schema, async () => await db.query("SELECT * FROM config"));

// 好: 带 TTL 的 LRU 缓存
const cache = new LRUCache({ max: 1000, ttl: 60000 });
server.registerMethod("getConfig", schema, async (params) => {
  const key = `config:${params.section}`;
  return cache.get(key) || cache.set(key, await db.query("SELECT * FROM config WHERE section = ?", [params.section]));
});

7.4 流式处理大结果

// 不好: 加载整个数据集(内存溢出风险)
server.registerMethod("exportData", schema, async () => await db.query("SELECT * FROM huge_table"));

// 好: 分页结果
server.registerMethod("exportData", schema, async ({ cursor = 0, limit = 100 }) => {
  const data = await db.query("SELECT * FROM huge_table WHERE id > ? LIMIT ?", [cursor, limit]);
  return { data, nextCursor: data.length === limit ? data[data.length - 1].id : null };
});

7.5 负载优化

// 不好: 返回所有字段(50KB)
server.registerMethod("getUser", schema, async ({ id }) => await getUser(id));

// 好: 仅返回请求字段(500B)
server.registerMethod("getUser", schema, async ({ id, fields }) => {
  const user = await getUser(id);
  return fields ? Object.fromEntries(fields.map(f => [f, user[f]])) : user;
});

8. 安全标准

8.1 领域漏洞概况

完整 CVE 详情请参见 references/security-examples.md

主要漏洞:

  • 方法注入: 访问未注册/内部方法
  • 参数注入: 恶意参数导致代码执行
  • 批量拒绝服务: 大量批量请求消耗资源
  • 错误信息泄露: 错误中包含堆栈跟踪

8.2 输入验证

// 使用 Zod 进行完整参数验证
const TransferSchema = z.object({
  from: z.string().uuid(),
  to: z.string().uuid(),
  amount: z.number().positive().max(1000000),
  currency: z.enum(["USD", "EUR", "GBP"]),
  memo: z.string().max(200).optional()
}).refine(data => data.from !== data.to, "Cannot transfer to same account");

server.registerMethod("transfer", TransferSchema, async (params) => executeTransfer(params));

8.3 错误处理

// 安全错误响应 - 内部记录详细信息,返回通用消息
class SafeJSONRPCError extends Error {
  constructor(public code: number, message: string, private internal?: string) { super(message); }

  toResponse(id: string | number | null): JSONRPCResponse {
    if (this.internal) console.error(`RPC Error [${this.code}]: ${this.internal}`);
    return { jsonrpc: "2.0", error: { code: this.code, message: this.message }, id };
  }
}

// 用法: 内部信息记录但不返回给客户端
throw new SafeJSONRPCError(-32603, "Internal error", `DB failed: ${dbError.message}`);

9. 常见错误

绝不要: 执行动态方法

// 不好: 从用户输入中任意访问方法
const fn = this[request.method]; return fn(request.params);

// 好: 仅白名单注册方法
const handler = this.registeredMethods.get(request.method);
if (!handler) throw new Error("Method not found");
return handler(request.params);

绝不要: 返回内部错误

// 不好: 暴露堆栈跟踪
catch (error) { return { error: { code: -32603, message: error.stack } }; }

// 好: 内部记录,返回通用消息
catch (error) { console.error(error); return { error: { code: -32603, message: "Internal error" } }; }

10. 预实现检查清单

阶段 1: 编写代码前

  • [ ] 为 RPC 方法和错误处理编写失败的测试
  • [ ] 定义所有方法的参数模式
  • [ ] 记录方法白名单
  • [ ] 为受保护方法计划认证策略

阶段 2: 实现过程中

  • [ ] 所有方法使用显式白名单注册
  • [ ] 使用模式验证参数(Zod/Pydantic)
  • [ ] 强制执行批量大小限制(最大 100)
  • [ ] 配置每个端点的速率限制
  • [ ] 清理错误消息(无堆栈跟踪)
  • [ ] 设置请求大小限制(最大 1MB)
  • [ ] 方法执行超时

阶段 3: 提交前

  • [ ] 所有测试通过: pytest tests/test_rpc_*.py -v
  • [ ] 安全测试通过: pytest tests/test_rpc_security.py -v
  • [ ] 性能基准可接受
  • [ ] 为所有方法调用启用审计日志
  • [ ] 更新新方法的文档

11. 总结

您的目标是实现 JSON-RPC 服务,使其:

  • 合规: 严格遵循 JSON-RPC 2.0 规范
  • 安全: 验证所有输入,白名单方法,清理错误
  • 健壮: 安全处理批量请求,强制执行限制,操作超时

记住: 每个 RPC 方法都是一个潜在的攻击向量。验证参数,授权访问,绝不暴露错误响应中的内部信息。