Kalshi量化交易技能Skill trading-kalshi

这个技能用于通过Kalshi的REST API进行量化交易,支持市场数据获取、订单管理、持仓监控等自动化操作,适用于预测市场投资和算法交易。关键词包括Kalshi、API、量化交易、预测市场、算法交易、数据统计分析、股票评估。

算法交易 0 次安装 0 次浏览 更新于 3/9/2026

name: trading-kalshi description: “在Kalshi上执行交易 - 提供完整的REST API访问市场、订单、持仓、余额” emoji: “📈” gates: envs: - KALSHI_EMAIL - KALSHI_PASSWORD

Kalshi交易 - 完整API参考

通过Kalshi的REST API完全访问CFTC监管的预测市场。

文档: https://docs.kalshi.com/welcome Discord: #dev频道获取支持

必需环境变量

KALSHI_EMAIL=your@email.com
KALSHI_PASSWORD=your_password

安装

pip install requests
# 可选: pip install kalshi-python  # 官方SDK

API基础URL

# 生产环境
BASE_URL = "https://trading-api.kalshi.com/trade-api/v2"

# 演示/沙盒环境(用于测试)
DEMO_URL = "https://demo-api.kalshi.co/trade-api/v2"

认证

Kalshi使用邮箱/密码登录,返回有效期为30分钟的Bearer令牌。

登录和令牌管理

import os
import time
import requests

BASE_URL = "https://trading-api.kalshi.com/trade-api/v2"

class KalshiClient:
    def __init__(self):
        self.email = os.getenv("KALSHI_EMAIL")
        self.password = os.getenv("KALSHI_PASSWORD")
        self.token = None
        self.token_expiry = 0
        self.member_id = None

    def _ensure_auth(self):
        """如果令牌过期则刷新(30分钟有效期)"""
        if time.time() > self.token_expiry - 60:
            self._login()

    def _login(self):
        """POST /login - 获取新认证令牌"""
        r = requests.post(f"{BASE_URL}/login", json={
            "email": self.email,
            "password": self.password
        })
        r.raise_for_status()
        data = r.json()
        self.token = data["token"]
        self.member_id = data.get("member_id")
        self.token_expiry = time.time() + 29 * 60  # 29分钟后刷新
        return data

    def _headers(self):
        """获取请求的认证头"""
        self._ensure_auth()
        return {
            "Authorization": f"Bearer {self.token}",
            "Content-Type": "application/json"
        }

    def logout(self):
        """POST /logout - 使当前令牌无效"""
        r = requests.post(f"{BASE_URL}/logout", headers=self._headers())
        self.token = None
        self.token_expiry = 0
        return r.status_code == 200

# 初始化
client = KalshiClient()

市场数据端点

获取市场列表

def get_markets(
    status: str = "open",    # "open", "closed", "settled"
    series_ticker: str = None,  # 按系列过滤
    limit: int = 100,
    cursor: str = None  # 用于分页
):
    """GET /markets - 列出市场"""
    params = {"status": status, "limit": limit}
    if series_ticker:
        params["series_ticker"] = series_ticker
    if cursor:
        params["cursor"] = cursor

    r = requests.get(f"{BASE_URL}/markets", headers=client._headers(), params=params)
    r.raise_for_status()
    data = r.json()
    return {
        "markets": data.get("markets", []),
        "cursor": data.get("cursor")  # 用于分页
    }

# 示例
markets = get_markets(series_ticker="INXD")  # 标普500每日市场
markets = get_markets(series_ticker="FED")    # 美联储利率决策
markets = get_markets(series_ticker="KXBTC")  # 比特币价格

获取单个市场

def get_market(ticker: str):
    """GET /markets/{ticker} - 单个市场详情"""
    r = requests.get(f"{BASE_URL}/markets/{ticker}", headers=client._headers())
    r.raise_for_status()
    return r.json()["market"]

market = get_market("INXD-24JAN10-T5805")
# 返回: ticker, title, subtitle, status, yes_bid, yes_ask,
#       no_bid, no_ask, volume, open_interest, close_time, result

获取市场订单簿

def get_orderbook(ticker: str, depth: int = 10):
    """GET /markets/{ticker}/orderbook - 完整订单簿"""
    r = requests.get(f"{BASE_URL}/markets/{ticker}/orderbook",
                    headers=client._headers(),
                    params={"depth": depth})
    r.raise_for_status()
    data = r.json()["orderbook"]

    # data["yes"] = YES侧的价格和数量列表 [价格, 数量]
    # data["no"] = NO侧的价格和数量列表 [价格, 数量]
    return data

book = get_orderbook("INXD-24JAN10-T5805")
print(f"Yes买盘: {book['yes']}")  # [[45, 100], [44, 200], ...]
print(f"No卖盘: {book['no']}")

获取市场历史/交易记录

def get_market_history(ticker: str, limit: int = 100):
    """GET /markets/{ticker}/history - 交易历史"""
    r = requests.get(f"{BASE_URL}/markets/{ticker}/history",
                    headers=client._headers(),
                    params={"limit": limit})
    r.raise_for_status()
    return r.json().get("history", [])

trades = get_market_history("INXD-24JAN10-T5805")
for t in trades:
    print(f"{t['created_time']}: {t['count']} @ {t['yes_price']}¢")

获取系列/事件

def get_series():
    """GET /series - 列出所有系列(类别)"""
    r = requests.get(f"{BASE_URL}/series", headers=client._headers())
    r.raise_for_status()
    return r.json().get("series", [])

def get_events(series_ticker: str = None):
    """GET /events - 列出事件"""
    params = {}
    if series_ticker:
        params["series_ticker"] = series_ticker
    r = requests.get(f"{BASE_URL}/events", headers=client._headers(), params=params)
    r.raise_for_status()
    return r.json().get("events", [])

series = get_series()
events = get_events("FED")

订单管理

下单

def place_order(
    ticker: str,
    side: str,           # "yes" 或 "no"
    action: str,         # "buy" 或 "sell"
    count: int,          # 合约数量
    price: int = None,   # 价格以分为单位 (1-99), None表示市价单
    order_type: str = "limit",  # "limit" 或 "market"
    expiration_ts: int = None,  # 可选: GTD过期时间戳
    client_order_id: str = None  # 可选: 您的参考ID
):
    """POST /portfolio/orders - 下单"""
    payload = {
        "ticker": ticker,
        "side": side.lower(),
        "action": action.lower(),
        "count": count,
        "type": order_type
    }

    if order_type == "limit" and price:
        # yes_price始终从YES侧角度
        payload["yes_price"] = price if side.lower() == "yes" else (100 - price)

    if expiration_ts:
        payload["expiration_ts"] = expiration_ts

    if client_order_id:
        payload["client_order_id"] = client_order_id

    r = requests.post(f"{BASE_URL}/portfolio/orders",
                     headers=client._headers(),
                     json=payload)
    r.raise_for_status()
    return r.json()

# 示例
# 以45分买入10个YES合约
result = place_order("INXD-24JAN10-T5805", "yes", "buy", 10, 45)

# 以30分卖出5个NO合约(相当于YES侧70分)
result = place_order("INXD-24JAN10-T5805", "no", "sell", 5, 30)

# 市价单(立即成交)
result = place_order("INXD-24JAN10-T5805", "yes", "buy", 10, order_type="market")

批量创建订单

def batch_create_orders(orders: list):
    """POST /portfolio/orders/batched - 创建多个订单"""
    payload = {"orders": orders}
    r = requests.post(f"{BASE_URL}/portfolio/orders/batched",
                     headers=client._headers(),
                     json=payload)
    r.raise_for_status()
    return r.json()

orders = [
    {"ticker": "INXD-24JAN10-T5805", "side": "yes", "action": "buy", "count": 5, "type": "limit", "yes_price": 40},
    {"ticker": "INXD-24JAN10-T5805", "side": "yes", "action": "buy", "count": 5, "type": "limit", "yes_price": 42},
]
results = batch_create_orders(orders)

修改订单

def amend_order(order_id: str, count: int = None, price: int = None):
    """POST /portfolio/orders/{order_id}/amend - 修改订单"""
    payload = {}
    if count:
        payload["count"] = count
    if price:
        payload["yes_price"] = price

    r = requests.post(f"{BASE_URL}/portfolio/orders/{order_id}/amend",
                     headers=client._headers(),
                     json=payload)
    r.raise_for_status()
    return r.json()

减少订单数量

def decrease_order(order_id: str, reduce_by: int):
    """POST /portfolio/orders/{order_id}/decrease - 减少订单数量"""
    r = requests.post(f"{BASE_URL}/portfolio/orders/{order_id}/decrease",
                     headers=client._headers(),
                     json={"reduce_by": reduce_by})
    r.raise_for_status()
    return r.json()

取消订单

def cancel_order(order_id: str):
    """DELETE /portfolio/orders/{order_id} - 取消单个订单"""
    r = requests.delete(f"{BASE_URL}/portfolio/orders/{order_id}",
                       headers=client._headers())
    return r.status_code in [200, 204]

def batch_cancel_orders(order_ids: list):
    """DELETE /portfolio/orders/batched - 取消多个订单"""
    r = requests.delete(f"{BASE_URL}/portfolio/orders/batched",
                       headers=client._headers(),
                       json={"order_ids": order_ids})
    r.raise_for_status()
    return r.json()

# 取消特定订单
cancel_order("abc123-order-id")

# 取消多个
batch_cancel_orders(["order-1", "order-2", "order-3"])

获取订单

def get_orders(
    ticker: str = None,
    status: str = None,  # "resting", "canceled", "executed", "pending"
    limit: int = 100
):
    """GET /portfolio/orders - 列出订单"""
    params = {"limit": limit}
    if ticker:
        params["ticker"] = ticker
    if status:
        params["status"] = status

    r = requests.get(f"{BASE_URL}/portfolio/orders",
                    headers=client._headers(),
                    params=params)
    r.raise_for_status()
    return r.json().get("orders", [])

def get_order(order_id: str):
    """GET /portfolio/orders/{order_id} - 单个订单"""
    r = requests.get(f"{BASE_URL}/portfolio/orders/{order_id}",
                    headers=client._headers())
    r.raise_for_status()
    return r.json()["order"]

# 获取所有未成交订单
orders = get_orders(status="resting")
for o in orders:
    print(f"{o['order_id']}: {o['action']} {o['side']} {o['remaining_count']} @ {o['yes_price']}¢")

投资组合管理

获取余额

def get_balance():
    """GET /portfolio/balance - 账户余额"""
    r = requests.get(f"{BASE_URL}/portfolio/balance", headers=client._headers())
    r.raise_for_status()
    data = r.json()

    return {
        "balance": data.get("balance", 0) / 100,  # 可用余额以美元为单位
        "portfolio_value": data.get("portfolio_value", 0) / 100
    }

bal = get_balance()
print(f"可用: ${bal['balance']:.2f}")
print(f"投资组合: ${bal['portfolio_value']:.2f}")

获取持仓

def get_positions(limit: int = 100):
    """GET /portfolio/positions - 当前持仓"""
    r = requests.get(f"{BASE_URL}/portfolio/positions",
                    headers=client._headers(),
                    params={"limit": limit})
    r.raise_for_status()
    return r.json().get("market_positions", [])

positions = get_positions()
for p in positions:
    if p.get("position", 0) != 0:
        print(f"{p['ticker']}: {p['position']} 合约 @ 均价 {p['average_price']}¢")
        print(f"  已实现盈亏: ${p.get('realized_pnl', 0) / 100:.2f}")

获取成交记录(交易历史)

def get_fills(
    ticker: str = None,
    limit: int = 100,
    cursor: str = None
):
    """GET /portfolio/fills - 已执行的交易"""
    params = {"limit": limit}
    if ticker:
        params["ticker"] = ticker
    if cursor:
        params["cursor"] = cursor

    r = requests.get(f"{BASE_URL}/portfolio/fills",
                    headers=client._headers(),
                    params=params)
    r.raise_for_status()
    data = r.json()
    return {
        "fills": data.get("fills", []),
        "cursor": data.get("cursor")
    }

fills = get_fills()
for f in fills["fills"]:
    print(f"{f['created_time']}: {f['action']} {f['side']} {f['count']} @ {f['price']}¢")

获取结算记录

def get_settlements(limit: int = 100):
    """GET /portfolio/settlements - 结算历史"""
    r = requests.get(f"{BASE_URL}/portfolio/settlements",
                    headers=client._headers(),
                    params={"limit": limit})
    r.raise_for_status()
    return r.json().get("settlements", [])

settlements = get_settlements()
for s in settlements:
    print(f"{s['ticker']}: 结算于 {s['settlement_value']}¢, 盈亏: ${s['revenue'] / 100:.2f}")

交易所状态

def get_exchange_status():
    """GET /exchange/status - 交易所操作状态"""
    r = requests.get(f"{BASE_URL}/exchange/status", headers=client._headers())
    r.raise_for_status()
    return r.json()

status = get_exchange_status()
print(f"交易: {status.get('trading_active')}")
print(f"交易所开放: {status.get('exchange_active')}")

WebSocket(实时数据)

对于实时更新,在REST认证后使用WebSocket:

import websocket
import json

def on_message(ws, message):
    data = json.loads(message)
    print(f"更新: {data}")

def on_open(ws):
    # 订阅订单簿更新
    ws.send(json.dumps({
        "type": "subscribe",
        "channel": "orderbook",
        "ticker": "INXD-24JAN10-T5805"
    }))

# 用认证令牌连接
ws = websocket.WebSocketApp(
    f"wss://trading-api.kalshi.com/trade-api/ws/v2?token={client.token}",
    on_message=on_message,
    on_open=on_open
)
ws.run_forever()

完整交易机器人

#!/usr/bin/env python3
"""
生产环境Kalshi交易机器人
"""

import os
import time
import requests

BASE_URL = "https://trading-api.kalshi.com/trade-api/v2"

class KalshiBot:
    def __init__(self):
        self.email = os.getenv("KALSHI_EMAIL")
        self.password = os.getenv("KALSHI_PASSWORD")
        self.token = None
        self.token_expiry = 0

    def _auth(self):
        if time.time() > self.token_expiry - 60:
            r = requests.post(f"{BASE_URL}/login", json={
                "email": self.email, "password": self.password
            })
            r.raise_for_status()
            self.token = r.json()["token"]
            self.token_expiry = time.time() + 29 * 60

    def _h(self):
        self._auth()
        return {"Authorization": f"Bearer {self.token}", "Content-Type": "application/json"}

    def get_market(self, ticker):
        r = requests.get(f"{BASE_URL}/markets/{ticker}", headers=self._h())
        r.raise_for_status()
        return r.json()["market"]

    def get_positions(self):
        r = requests.get(f"{BASE_URL}/portfolio/positions", headers=self._h())
        r.raise_for_status()
        return {p["ticker"]: p for p in r.json().get("market_positions", [])}

    def get_balance(self):
        r = requests.get(f"{BASE_URL}/portfolio/balance", headers=self._h())
        r.raise_for_status()
        return r.json().get("balance", 0) / 100

    def buy(self, ticker, side, count, price):
        payload = {
            "ticker": ticker, "side": side, "action": "buy",
            "count": count, "type": "limit",
            "yes_price": price if side == "yes" else (100 - price)
        }
        r = requests.post(f"{BASE_URL}/portfolio/orders", headers=self._h(), json=payload)
        return r.json() if r.status_code == 200 else {"error": r.text}

    def sell(self, ticker, side, count, price):
        payload = {
            "ticker": ticker, "side": side, "action": "sell",
            "count": count, "type": "limit",
            "yes_price": price if side == "yes" else (100 - price)
        }
        r = requests.post(f"{BASE_URL}/portfolio/orders", headers=self._h(), json=payload)
        return r.json() if r.status_code == 200 else {"error": r.text}

# 运行
bot = KalshiBot()
TICKER = "INXD-24JAN10-T5805"

while True:
    try:
        market = bot.get_market(TICKER)
        positions = bot.get_positions()
        balance = bot.get_balance()

        yes_bid = market["yes_bid"]
        yes_ask = market["yes_ask"]
        pos = positions.get(TICKER, {}).get("position", 0)

        print(f"余额: ${balance:.2f}, 持仓: {pos}, 价格: {yes_bid}/{yes_ask}")

        # 交易逻辑
        if yes_ask < 40 and pos < 10 and balance > 5:
            print(f"买入 at {yes_ask}")
            bot.buy(TICKER, "yes", 5, yes_ask)
        elif yes_bid > 60 and pos > 0:
            print(f"卖出 at {yes_bid}")
            bot.sell(TICKER, "yes", pos, yes_bid)

        time.sleep(10)

    except Exception as e:
        print(f"错误: {e}")
        time.sleep(30)

热门市场系列

系列 描述 示例代码
FED 美联储利率决策 FED-24MAR-T525
INXD 标普500每日收盘 INXD-24JAN10-T5805
KXBTC 比特币价格区间 KXBTC-24JAN-T45000
KXETH 以太坊价格 KXETH-24JAN-T2500
CPI 通胀数据 CPI-24JAN-T3.5
GDP GDP增长 GDP-24Q1-T2.0
NFP 非农就业数据 NFP-24JAN-T200K

关键说明

  1. 价格以分为单位 - 45表示每合约0.45美元
  2. 合约若正确则支付1美元 - 成本为价格,利润为1美元 - 价格
  3. 无交易费用 - 仅价差重要
  4. 令牌30分钟后过期 - 在过期前自动刷新
  5. 仅限美国居民 - 需要KYC验证
  6. 存在速率限制 - 对429错误实施指数退避
  7. 最大持仓限制 - 随市场变化,检查市场详情

CLI包装器

用于快速CLI访问:

python trading/kalshi.py search "fed rate"
python trading/kalshi.py market <ticker>
python trading/kalshi.py buy <ticker> <side> <count> <price>
python trading/kalshi.py sell <ticker> <side> <count> <price>
python trading/kalshi.py positions
python trading/kalshi.py balance
python trading/kalshi.py orders
python trading/kalshi.py cancel <order_id>