数据集成管理技能Skill integrations

这个技能用于管理外部数据源、连接器和自定义数据流,为量化交易机器人提供实时数据集成服务,支持REST API、WebSocket和Webhook等多种集成方式,提升交易策略的数据驱动能力。关键词:数据集成、API连接、量化交易、实时数据流、外部数据源、交易机器人、数据管理、量化金融、股票分析。

量化策略 0 次安装 0 次浏览 更新于 3/9/2026

名称: 集成 描述: “外部数据源、连接器和自定义数据流” 表情符号: “🔗”

集成 - 完整 API 参考

管理外部数据源、添加自定义连接器,并为交易机器人集成新数据流。


聊天命令

列出数据源

/集成                               列出所有数据源
/集成 状态                          显示源健康状况
/集成 源                            可用源类型

管理源

/集成 启用 fedwatch                启用 CME FedWatch
/集成 禁用 538                     禁用 FiveThirtyEight
/集成 添加 webhook "我的信号"      添加自定义 webhook 源
/集成 添加 rest "我的API" <URL>     添加 REST API 源
/集成 删除 <源ID>                   删除数据源

配置源

/集成 配置 fedwatch                查看源配置
/集成 设置 fedwatch 间隔 60        设置刷新间隔
/集成 设置 fedwatch 密钥 <API密钥>  设置 API 密钥
/集成 测试 <源ID>                  测试源连接

查看数据

/集成 数据 fedwatch                来自源的最新数据
/集成 历史 <源> --小时 24          历史数据
/集成 订阅 <源>                    实时更新

TypeScript API 参考

创建集成管理器

import { createIntegrationsManager } from 'clodds/integrations';

const integrations = createIntegrationsManager({
  // 存储
  storage: 'sqlite',
  dbPath: './integrations.db',

  // 默认刷新间隔
  defaultIntervalMs: 60000,

  // 重试设置
  maxRetries: 3,
  retryDelayMs: 5000,
});

内置数据源

// 启用内置源
await integrations.enable('fedwatch');     // CME FedWatch
await integrations.enable('538');          // FiveThirtyEight
await integrations.enable('silver');       // Silver Bulletin
await integrations.enable('rcp');          // RealClearPolitics
await integrations.enable('odds-api');     // The Odds API
await integrations.enable('polymarket');   // Polymarket 价格
await integrations.enable('kalshi');       // Kalshi 价格
await integrations.enable('binance');      // Binance 现货价格
await integrations.enable('crypto');       // 多交易所加密

添加自定义 Webhook 源

// 添加 webhook 接收自定义信号
const source = await integrations.addWebhook({
  name: '我的信号',
  description: '自定义交易信号',

  // Webhook 配置
  path: '/webhooks/我的信号',
  secret: process.env.WEBHOOK_SECRET,

  // 数据模式(可选验证)
  schema: {
    type: 'object',
    properties: {
      signal: { type: 'string', enum: ['买入', '卖出', '持有'] },
      symbol: { type: 'string' },
      confidence: { type: 'number', min: 0, max: 1 },
    },
    required: ['signal', 'symbol'],
  },

  // 转换传入数据
  transform: (payload) => ({
    signal: payload.signal,
    symbol: payload.symbol,
    confidence: payload.confidence || 0.5,
    timestamp: Date.now(),
  }),
});

console.log(`Webhook URL: ${source.url}`);
// POST 到: https://your-domain.com/webhooks/我的信号

添加自定义 REST 源

// 添加 REST API 数据源
const source = await integrations.addRest({
  name: '我的API',
  description: '自定义价格 API',

  // API 配置
  url: 'https://api.example.com/prices',
  method: 'GET',
  headers: {
    'Authorization': `Bearer ${process.env.MY_API_KEY}`,
  },

  // 轮询间隔
  intervalMs: 30000,

  // 转换响应
  transform: (response) => ({
    price: response.data.price,
    volume: response.data.volume,
    timestamp: Date.now(),
  }),
});

添加 WebSocket 源

// 添加 WebSocket 数据源
const source = await integrations.addWebSocket({
  name: '实时价格',
  description: '实时价格馈送',

  // WebSocket 配置
  url: 'wss://stream.example.com/prices',

  // 消息处理器
  onMessage: (data) => ({
    type: '价格',
    symbol: data.s,
    price: parseFloat(data.p),
    timestamp: data.t,
  }),

  // 订阅消息
  subscribe: {
    method: '订阅',
    params: ['btcusdt@trade'],
  },

  // 重连设置
  reconnect: true,
  reconnectIntervalMs: 5000,
});

订阅数据

// 订阅实时更新
integrations.subscribe('我的信号', (data) => {
  console.log(`信号: ${data.signal} ${data.symbol}`);
  console.log(`置信度: ${data.confidence}`);

  if (data.signal === '买入' && data.confidence > 0.8) {
    // 执行交易逻辑
  }
});

// 订阅多个源
integrations.subscribeAll(['fedwatch', 'crypto', '我的信号'], (source, data) => {
  console.log(`[${source}] ${JSON.stringify(data)}`);
});

获取最新数据

// 从源获取当前数据
const fedData = await integrations.getData('fedwatch');

console.log('美联储利率概率:');
for (const meeting of fedData.meetings) {
  console.log(`${meeting.date}: ${meeting.probabilities}`);
}

// 带新鲜度检查获取
const data = await integrations.getData('crypto', {
  maxAgeMs: 60000,  // 如果超过 60 秒则重新获取
});

检查状态

// 获取源状态
const status = await integrations.getStatus('我的API');

console.log(`状态: ${status.status}`);  // '健康' | '降级' | '错误'
console.log(`最后获取: ${status.lastFetch}`);
console.log(`最后错误: ${status.lastError}`);
console.log(`获取次数: ${status.fetchCount}`);
console.log(`错误次数: ${status.errorCount}`);

// 获取所有状态
const all = await integrations.getAllStatuses();

内置数据源

类型 数据 刷新
fedwatch REST 美联储利率概率 5 分钟
538 REST 选举预测 1 小时
silver REST Silver Bulletin 预测 1 小时
rcp REST 民意调查平均值 15 分钟
odds-api REST 体育博彩赔率 1 分钟
polymarket WebSocket 市场价格 实时
kalshi WebSocket 市场价格 实时
binance WebSocket 加密价格 实时

自定义源类型

类型 最适合 延迟
webhook 外部信号推送到您 即时
rest 定期轮询的 API 秒级
websocket 实时流数据 毫秒级

在机器人中使用数据

import { createTradingBot } from 'clodds/trading';
import { createIntegrationsManager } from 'clodds/integrations';

const integrations = createIntegrationsManager();
const bot = createTradingBot();

// 在机器人策略中使用自定义信号
integrations.subscribe('我的信号', async (signal) => {
  if (signal.signal === '买入' && signal.confidence > 0.9) {
    await bot.execute({
      platform: 'polymarket',
      market: signal.symbol,
      side: 'YES',
      size: 100 * signal.confidence,
    });
  }
});

// 使用美联储数据进行宏观投注
integrations.subscribe('fedwatch', async (data) => {
  const cutProb = data.meetings[0].probabilities['25bp_cut'];
  if (cutProb > 0.8) {
    // 利率削减的高概率
    await bot.execute({
      platform: 'kalshi',
      market: 'fed-rate-cut',
      side: 'YES',
      size: 500,
    });
  }
});

环境变量

# 内置源
CME_FEDWATCH_API_KEY=您的密钥
FIVETHIRTYEIGHT_API_KEY=您的密钥
ODDS_API_KEY=您的密钥

# 自定义源
MY_SIGNALS_WEBHOOK_SECRET=您的秘密
MY_API_KEY=您的密钥

最佳实践

  1. 验证传入数据 — 为 webhook 使用模式
  2. 设置适当的间隔 — 不要轮询太频繁
  3. 优雅处理错误 — 源有时会失败
  4. 监控新鲜度 — 陈旧数据警报
  5. 一致转换 — 规范化数据格式
  6. 使用 WebSocket 降低延迟 — 当毫秒重要时