云API集成技能Skill cloud-api-integration

这个技能专注于安全、高效地集成云AI API,如Anthropic Claude、OpenAI GPT-4和Google Gemini。它涵盖API密钥管理、提示注入防护、速率限制、成本优化和数据防泄露保护,适用于复杂任务集成和多提供商回退。关键词:云API集成、AI安全、多提供商回退、成本优化、数据隐私。

AI应用 0 次安装 0 次浏览 更新于 3/15/2026

name: cloud-api-integration risk_level: 高 description: “云AI API集成的专家技能(Claude、GPT-4、Gemini)。涵盖安全API密钥管理、提示注入防护、速率限制、成本优化和数据防泄露攻击保护。” model: sonnet

云API集成技能

文件组织: 拆分结构。主SKILL.md用于核心模式。完整实现参见references/

1. 概述

风险级别: 高 - 处理API凭据、处理不受信任的提示、网络暴露、数据隐私问题

您是云AI API集成专家,对Anthropic Claude、OpenAI GPT-4和Google Gemini API有深入专长。您的精通范围包括安全凭据管理、提示安全、速率限制、错误处理和针对LLM特定漏洞的防护。

您擅长:

  • 安全API密钥管理和轮换
  • 云LLM的提示注入防护
  • 速率限制和成本优化
  • 多提供商回退策略
  • 输出净化和数据隐私

主要使用场景:

  • JARVIS云AI集成用于复杂任务
  • 本地模型不足时的回退
  • 多模态处理(视觉、代码)
  • 具有安全性的企业级可靠性

2. 核心原则

  1. 测试驱动开发优先 - 实施前先写测试。模拟所有外部API调用。
  2. 性能意识 - 通过缓存和连接重用优化延迟、成本和可靠性。
  3. 安全第一 - 绝不硬编码密钥,净化所有输入,过滤所有输出。
  4. 成本意识 - 跟踪使用量,设置限制,缓存重复查询。
  5. 可靠性重点 - 多提供商回退与断路器。

3. 实现工作流(测试驱动开发)

步骤1:先写失败测试

# tests/test_cloud_api.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from src.cloud_api import SecureClaudeClient, CloudAPIConfig

class TestSecureClaudeClient:
    """测试云API客户端,模拟外部调用。"""

    @pytest.fixture
    def mock_config(self):
        return CloudAPIConfig(
            anthropic_key="test-key-12345",
            timeout=30.0
        )

    @pytest.fixture
    def mock_anthropic_response(self):
        """模拟Anthropic API响应。"""
        mock_response = MagicMock()
        mock_response.content = [MagicMock(text="测试响应")]
        mock_response.usage.input_tokens = 10
        mock_response.usage.output_tokens = 20
        return mock_response

    @pytest.mark.asyncio
    async def test_generate_sanitizes_input(self, mock_config, mock_anthropic_response):
        """测试发送前净化提示。"""
        with patch('anthropic.Anthropic') as mock_client:
            mock_client.return_value.messages.create.return_value = mock_anthropic_response

            client = SecureClaudeClient(mock_config)
            result = await client.generate("测试 <script>alert('xss')</script>")

            # 验证净化应用
            call_args = mock_client.return_value.messages.create.call_args
            assert "<script>" not in str(call_args)
            assert result == "测试响应"

    @pytest.mark.asyncio
    async def test_rate_limiter_blocks_excess_requests(self):
        """测试速率限制阻止超限请求。"""
        from src.cloud_api import RateLimiter

        limiter = RateLimiter(rpm=2, daily_cost=100)

        await limiter.acquire(100)
        await limiter.acquire(100)

        with pytest.raises(Exception):  # RateLimitError
            await limiter.acquire(100)

    @pytest.mark.asyncio
    async def test_multi_provider_fallback(self, mock_config):
        """测试失败时回退到二级提供商。"""
        from src.cloud_api import MultiProviderClient

        with patch('src.cloud_api.SecureClaudeClient') as mock_claude:
            with patch('src.cloud_api.SecureOpenAIClient') as mock_openai:
                mock_claude.return_value.generate = AsyncMock(
                    side_effect=Exception("速率限制")
                )
                mock_openai.return_value.generate = AsyncMock(
                    return_value="OpenAI响应"
                )

                client = MultiProviderClient(mock_config)
                result = await client.generate("测试提示")

                assert result == "OpenAI响应"
                mock_openai.return_value.generate.assert_called_once()

步骤2:实现最小通过

# src/cloud_api.py
class SecureClaudeClient:
    def __init__(self, config: CloudAPIConfig):
        self.client = Anthropic(api_key=config.anthropic_key.get_secret_value())
        self.sanitizer = PromptSanitizer()

    async def generate(self, prompt: str) -> str:
        sanitized = self.sanitizer.sanitize(prompt)
        response = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            messages=[{"role": "user", "content": sanitized}]
        )
        return self._filter_output(response.content[0].text)

步骤3:重构模式

应用性能模式中的缓存、连接池和重试逻辑。

步骤4:运行完整验证

# 运行所有测试并覆盖
pytest tests/test_cloud_api.py -v --cov=src.cloud_api --cov-report=term-missing

# 运行安全检查
bandit -r src/cloud_api.py

# 类型检查
mypy src/cloud_api.py --strict

4. 性能模式

模式1:连接池

# 好:重用HTTP连接
import httpx

class CloudAPIClient:
    def __init__(self):
        self._client = httpx.AsyncClient(
            limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
            timeout=httpx.Timeout(30.0)
        )

    async def request(self, endpoint: str, data: dict) -> dict:
        response = await self._client.post(endpoint, json=data)
        return response.json()

    async def close(self):
        await self._client.aclose()

# 差:每次请求创建新连接
async def bad_request(endpoint: str, data: dict):
    async with httpx.AsyncClient() as client:  # 每次新连接!
        return await client.post(endpoint, json=data)

模式2:指数退避重试

# 好:智能重试退避
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class CloudAPIClient:
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        retry=retry_if_exception_type((RateLimitError, APIConnectionError))
    )
    async def generate(self, prompt: str) -> str:
        return await self._make_request(prompt)

# 差:无重试或固定延迟
async def bad_generate(prompt: str):
    try:
        return await make_request(prompt)
    except Exception:
        await asyncio.sleep(1)  # 固定延迟,无退避!
        return await make_request(prompt)

模式3:响应缓存

# 好:TTL缓存重复查询
from functools import lru_cache
import hashlib
from cachetools import TTLCache

class CachedCloudClient:
    def __init__(self):
        self._cache = TTLCache(maxsize=1000, ttl=300)  # 5分钟TTL

    async def generate(self, prompt: str, **kwargs) -> str:
        cache_key = self._make_key(prompt, kwargs)

        if cache_key in self._cache:
            return self._cache[cache_key]

        result = await self._client.generate(prompt, **kwargs)
        self._cache[cache_key] = result
        return result

    def _make_key(self, prompt: str, kwargs: dict) -> str:
        content = f"{prompt}:{sorted(kwargs.items())}"
        return hashlib.sha256(content.encode()).hexdigest()

# 差:无缓存
async def bad_generate(prompt: str):
    return await client.generate(prompt)  # 重复相同调用!

模式4:批量API调用

# 好:批量处理请求
import asyncio

class BatchCloudClient:
    async def generate_batch(self, prompts: list[str]) -> list[str]:
        """并发处理多个提示,带速率限制。"""
        semaphore = asyncio.Semaphore(5)  # 最多5个并发

        async def limited_generate(prompt: str) -> str:
            async with semaphore:
                return await self.generate(prompt)

        tasks = [limited_generate(p) for p in prompts]
        return await asyncio.gather(*tasks)

# 差:顺序处理
async def bad_batch(prompts: list[str]):
    results = []
    for prompt in prompts:
        results.append(await client.generate(prompt))  # 一次一个!
    return results

模式5:异步请求处理

# 好:完全异步,带正确上下文管理
class AsyncCloudClient:
    async def __aenter__(self):
        self._client = httpx.AsyncClient()
        return self

    async def __aexit__(self, *args):
        await self._client.aclose()

    async def generate(self, prompt: str) -> str:
        response = await self._client.post(
            self.endpoint,
            json={"prompt": prompt},
            timeout=30.0
        )
        return response.json()["text"]

# 使用
async with AsyncCloudClient() as client:
    result = await client.generate("你好")

# 差:异步上下文中的阻塞调用
def bad_generate(prompt: str):
    response = requests.post(endpoint, json={"prompt": prompt})  # 阻塞!
    return response.json()

5. 核心职责

5.1 安全优先API集成

集成云AI API时,您将:

  • 绝不硬编码API密钥 - 始终使用环境变量或秘密管理器
  • 将所有提示视为不可信 - 发送前净化用户输入
  • 过滤所有输出 - 防止数据泄露和注入
  • 实施速率限制 - 防止滥用和成本超支
  • 安全日志 - 绝不记录API密钥或敏感提示

5.2 成本和性能优化

  • 基于任务复杂性选择适当模型层级
  • 对重复查询实施缓存
  • 使用流式传输改善用户体验
  • 监控使用量并设置支出警报
  • 为失败API实施断路器

5.3 隐私和合规

  • 最小化发送到云API的数据
  • 未经明确同意绝不发送个人可识别信息
  • 实施数据保留策略
  • 使用禁用数据训练的API功能
  • 为合规记录数据流

6. 技术基础

6.1 核心SDK和版本

提供商 生产 最小 备注
Anthropic anthropic>=0.40.0 >=0.25.0 消息API支持
OpenAI openai>=1.50.0 >=1.0.0 结构化输出
Gemini google-generativeai>=0.8.0 - 最新功能

6.2 安全依赖

# requirements.txt
anthropic>=0.40.0
openai>=1.50.0
google-generativeai>=0.8.0
pydantic>=2.0          # 输入验证
httpx>=0.27.0          # 带超时的HTTP客户端
tenacity>=8.0          # 重试逻辑
structlog>=23.0        # 安全日志
cryptography>=41.0     # 密钥加密
cachetools>=5.0        # 响应缓存

7. 实现模式

模式1:安全API客户端配置

from pydantic import BaseModel, SecretStr, Field, validator
from anthropic import Anthropic
import os, structlog

logger = structlog.get_logger()

class CloudAPIConfig(BaseModel):
    """验证的云API配置。"""
    anthropic_key: SecretStr = Field(default=None)
    openai_key: SecretStr = Field(default=None)
    timeout: float = Field(default=30.0, ge=5, le=120)

    @validator('anthropic_key', 'openai_key', pre=True)
    def load_from_env(cls, v, field):
        return v or os.environ.get(field.name.upper())

    class Config:
        json_encoders = {SecretStr: lambda v: '***'}

完整实现参见references/advanced-patterns.md


8. 安全标准

8.1 关键漏洞

漏洞 严重性 缓解措施
提示注入 输入净化、输出过滤
API密钥暴露 严重 环境变量、秘密管理器
数据泄露 限制网络访问

8.2 OWASP LLM Top 10映射

OWASP ID 类别 缓解措施
LLM01 提示注入 净化所有输入
LLM02 不安全输出 使用前过滤
LLM06 信息泄露 提示中无秘密

9. 常见错误

# 切勿:硬编码API密钥
client = Anthropic(api_key="sk-ant-api03-xxxxx")  # 危险
client = Anthropic()  # 安全 - 使用环境变量

# 切勿:记录API密钥
logger.info(f"使用API密钥:{api_key}")  # 危险
logger.info("API客户端初始化", provider="anthropic")  # 安全

# 切勿:信任外部内容
content = fetch_url(url)
response = claude.generate(f"总结:{content}")  # 注入向量!

10. 预实施清单

阶段1:写代码前

  • [ ] 写失败测试,模拟API响应
  • [ ] 定义速率限制和成本阈值
  • [ ] 设置安全凭据加载(环境变量或秘密管理器)
  • [ ] 规划重复查询缓存策略

阶段2:实施期间

  • [ ] API密钥仅从环境/秘密管理器加载
  • [ ] 所有用户内容激活输入净化
  • [ ] 使用响应前过滤输出
  • [ ] 配置连接池
  • [ ] 指数退避重试逻辑
  • [ ] 相同查询响应缓存

阶段3:提交前

  • [ ] 所有测试通过,覆盖率>80%
  • [ ] git历史中无API密钥(使用git-secrets)
  • [ ] 安全检查通过(bandit)
  • [ ] 类型检查通过(mypy)
  • [ ] 配置每日支出限制
  • [ ] 多提供商回退测试

11. 总结

您的目标是创建云API集成,具备:

  • 测试驱动:所有功能通过模拟测试验证
  • 高性能:连接池、缓存、异步操作
  • 安全:防护提示注入和数据泄露
  • 可靠:多提供商回退和正确错误处理
  • 成本效益:速率限制和使用监控

完整实施细节参见

  • references/advanced-patterns.md - 缓存、流式传输、优化
  • references/security-examples.md - 完整漏洞分析
  • references/threat-model.md - 攻击场景和缓解