name: trading-kalshi description: “在Kalshi上执行交易 - 提供完整的REST API访问市场、订单、持仓、余额” emoji: “📈” gates: envs: - KALSHI_EMAIL - KALSHI_PASSWORD
Kalshi交易 - 完整API参考
通过Kalshi的REST API完全访问CFTC监管的预测市场。
文档: https://docs.kalshi.com/welcome Discord: #dev频道获取支持
必需环境变量
KALSHI_EMAIL=your@email.com
KALSHI_PASSWORD=your_password
安装
pip install requests
# 可选: pip install kalshi-python # 官方SDK
API基础URL
# 生产环境
BASE_URL = "https://trading-api.kalshi.com/trade-api/v2"
# 演示/沙盒环境(用于测试)
DEMO_URL = "https://demo-api.kalshi.co/trade-api/v2"
认证
Kalshi使用邮箱/密码登录,返回有效期为30分钟的Bearer令牌。
登录和令牌管理
import os
import time
import requests
BASE_URL = "https://trading-api.kalshi.com/trade-api/v2"
class KalshiClient:
def __init__(self):
self.email = os.getenv("KALSHI_EMAIL")
self.password = os.getenv("KALSHI_PASSWORD")
self.token = None
self.token_expiry = 0
self.member_id = None
def _ensure_auth(self):
"""如果令牌过期则刷新(30分钟有效期)"""
if time.time() > self.token_expiry - 60:
self._login()
def _login(self):
"""POST /login - 获取新认证令牌"""
r = requests.post(f"{BASE_URL}/login", json={
"email": self.email,
"password": self.password
})
r.raise_for_status()
data = r.json()
self.token = data["token"]
self.member_id = data.get("member_id")
self.token_expiry = time.time() + 29 * 60 # 29分钟后刷新
return data
def _headers(self):
"""获取请求的认证头"""
self._ensure_auth()
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
def logout(self):
"""POST /logout - 使当前令牌无效"""
r = requests.post(f"{BASE_URL}/logout", headers=self._headers())
self.token = None
self.token_expiry = 0
return r.status_code == 200
# 初始化
client = KalshiClient()
市场数据端点
获取市场列表
def get_markets(
status: str = "open", # "open", "closed", "settled"
series_ticker: str = None, # 按系列过滤
limit: int = 100,
cursor: str = None # 用于分页
):
"""GET /markets - 列出市场"""
params = {"status": status, "limit": limit}
if series_ticker:
params["series_ticker"] = series_ticker
if cursor:
params["cursor"] = cursor
r = requests.get(f"{BASE_URL}/markets", headers=client._headers(), params=params)
r.raise_for_status()
data = r.json()
return {
"markets": data.get("markets", []),
"cursor": data.get("cursor") # 用于分页
}
# 示例
markets = get_markets(series_ticker="INXD") # 标普500每日市场
markets = get_markets(series_ticker="FED") # 美联储利率决策
markets = get_markets(series_ticker="KXBTC") # 比特币价格
获取单个市场
def get_market(ticker: str):
"""GET /markets/{ticker} - 单个市场详情"""
r = requests.get(f"{BASE_URL}/markets/{ticker}", headers=client._headers())
r.raise_for_status()
return r.json()["market"]
market = get_market("INXD-24JAN10-T5805")
# 返回: ticker, title, subtitle, status, yes_bid, yes_ask,
# no_bid, no_ask, volume, open_interest, close_time, result
获取市场订单簿
def get_orderbook(ticker: str, depth: int = 10):
"""GET /markets/{ticker}/orderbook - 完整订单簿"""
r = requests.get(f"{BASE_URL}/markets/{ticker}/orderbook",
headers=client._headers(),
params={"depth": depth})
r.raise_for_status()
data = r.json()["orderbook"]
# data["yes"] = YES侧的价格和数量列表 [价格, 数量]
# data["no"] = NO侧的价格和数量列表 [价格, 数量]
return data
book = get_orderbook("INXD-24JAN10-T5805")
print(f"Yes买盘: {book['yes']}") # [[45, 100], [44, 200], ...]
print(f"No卖盘: {book['no']}")
获取市场历史/交易记录
def get_market_history(ticker: str, limit: int = 100):
"""GET /markets/{ticker}/history - 交易历史"""
r = requests.get(f"{BASE_URL}/markets/{ticker}/history",
headers=client._headers(),
params={"limit": limit})
r.raise_for_status()
return r.json().get("history", [])
trades = get_market_history("INXD-24JAN10-T5805")
for t in trades:
print(f"{t['created_time']}: {t['count']} @ {t['yes_price']}¢")
获取系列/事件
def get_series():
"""GET /series - 列出所有系列(类别)"""
r = requests.get(f"{BASE_URL}/series", headers=client._headers())
r.raise_for_status()
return r.json().get("series", [])
def get_events(series_ticker: str = None):
"""GET /events - 列出事件"""
params = {}
if series_ticker:
params["series_ticker"] = series_ticker
r = requests.get(f"{BASE_URL}/events", headers=client._headers(), params=params)
r.raise_for_status()
return r.json().get("events", [])
series = get_series()
events = get_events("FED")
订单管理
下单
def place_order(
ticker: str,
side: str, # "yes" 或 "no"
action: str, # "buy" 或 "sell"
count: int, # 合约数量
price: int = None, # 价格以分为单位 (1-99), None表示市价单
order_type: str = "limit", # "limit" 或 "market"
expiration_ts: int = None, # 可选: GTD过期时间戳
client_order_id: str = None # 可选: 您的参考ID
):
"""POST /portfolio/orders - 下单"""
payload = {
"ticker": ticker,
"side": side.lower(),
"action": action.lower(),
"count": count,
"type": order_type
}
if order_type == "limit" and price:
# yes_price始终从YES侧角度
payload["yes_price"] = price if side.lower() == "yes" else (100 - price)
if expiration_ts:
payload["expiration_ts"] = expiration_ts
if client_order_id:
payload["client_order_id"] = client_order_id
r = requests.post(f"{BASE_URL}/portfolio/orders",
headers=client._headers(),
json=payload)
r.raise_for_status()
return r.json()
# 示例
# 以45分买入10个YES合约
result = place_order("INXD-24JAN10-T5805", "yes", "buy", 10, 45)
# 以30分卖出5个NO合约(相当于YES侧70分)
result = place_order("INXD-24JAN10-T5805", "no", "sell", 5, 30)
# 市价单(立即成交)
result = place_order("INXD-24JAN10-T5805", "yes", "buy", 10, order_type="market")
批量创建订单
def batch_create_orders(orders: list):
"""POST /portfolio/orders/batched - 创建多个订单"""
payload = {"orders": orders}
r = requests.post(f"{BASE_URL}/portfolio/orders/batched",
headers=client._headers(),
json=payload)
r.raise_for_status()
return r.json()
orders = [
{"ticker": "INXD-24JAN10-T5805", "side": "yes", "action": "buy", "count": 5, "type": "limit", "yes_price": 40},
{"ticker": "INXD-24JAN10-T5805", "side": "yes", "action": "buy", "count": 5, "type": "limit", "yes_price": 42},
]
results = batch_create_orders(orders)
修改订单
def amend_order(order_id: str, count: int = None, price: int = None):
"""POST /portfolio/orders/{order_id}/amend - 修改订单"""
payload = {}
if count:
payload["count"] = count
if price:
payload["yes_price"] = price
r = requests.post(f"{BASE_URL}/portfolio/orders/{order_id}/amend",
headers=client._headers(),
json=payload)
r.raise_for_status()
return r.json()
减少订单数量
def decrease_order(order_id: str, reduce_by: int):
"""POST /portfolio/orders/{order_id}/decrease - 减少订单数量"""
r = requests.post(f"{BASE_URL}/portfolio/orders/{order_id}/decrease",
headers=client._headers(),
json={"reduce_by": reduce_by})
r.raise_for_status()
return r.json()
取消订单
def cancel_order(order_id: str):
"""DELETE /portfolio/orders/{order_id} - 取消单个订单"""
r = requests.delete(f"{BASE_URL}/portfolio/orders/{order_id}",
headers=client._headers())
return r.status_code in [200, 204]
def batch_cancel_orders(order_ids: list):
"""DELETE /portfolio/orders/batched - 取消多个订单"""
r = requests.delete(f"{BASE_URL}/portfolio/orders/batched",
headers=client._headers(),
json={"order_ids": order_ids})
r.raise_for_status()
return r.json()
# 取消特定订单
cancel_order("abc123-order-id")
# 取消多个
batch_cancel_orders(["order-1", "order-2", "order-3"])
获取订单
def get_orders(
ticker: str = None,
status: str = None, # "resting", "canceled", "executed", "pending"
limit: int = 100
):
"""GET /portfolio/orders - 列出订单"""
params = {"limit": limit}
if ticker:
params["ticker"] = ticker
if status:
params["status"] = status
r = requests.get(f"{BASE_URL}/portfolio/orders",
headers=client._headers(),
params=params)
r.raise_for_status()
return r.json().get("orders", [])
def get_order(order_id: str):
"""GET /portfolio/orders/{order_id} - 单个订单"""
r = requests.get(f"{BASE_URL}/portfolio/orders/{order_id}",
headers=client._headers())
r.raise_for_status()
return r.json()["order"]
# 获取所有未成交订单
orders = get_orders(status="resting")
for o in orders:
print(f"{o['order_id']}: {o['action']} {o['side']} {o['remaining_count']} @ {o['yes_price']}¢")
投资组合管理
获取余额
def get_balance():
"""GET /portfolio/balance - 账户余额"""
r = requests.get(f"{BASE_URL}/portfolio/balance", headers=client._headers())
r.raise_for_status()
data = r.json()
return {
"balance": data.get("balance", 0) / 100, # 可用余额以美元为单位
"portfolio_value": data.get("portfolio_value", 0) / 100
}
bal = get_balance()
print(f"可用: ${bal['balance']:.2f}")
print(f"投资组合: ${bal['portfolio_value']:.2f}")
获取持仓
def get_positions(limit: int = 100):
"""GET /portfolio/positions - 当前持仓"""
r = requests.get(f"{BASE_URL}/portfolio/positions",
headers=client._headers(),
params={"limit": limit})
r.raise_for_status()
return r.json().get("market_positions", [])
positions = get_positions()
for p in positions:
if p.get("position", 0) != 0:
print(f"{p['ticker']}: {p['position']} 合约 @ 均价 {p['average_price']}¢")
print(f" 已实现盈亏: ${p.get('realized_pnl', 0) / 100:.2f}")
获取成交记录(交易历史)
def get_fills(
ticker: str = None,
limit: int = 100,
cursor: str = None
):
"""GET /portfolio/fills - 已执行的交易"""
params = {"limit": limit}
if ticker:
params["ticker"] = ticker
if cursor:
params["cursor"] = cursor
r = requests.get(f"{BASE_URL}/portfolio/fills",
headers=client._headers(),
params=params)
r.raise_for_status()
data = r.json()
return {
"fills": data.get("fills", []),
"cursor": data.get("cursor")
}
fills = get_fills()
for f in fills["fills"]:
print(f"{f['created_time']}: {f['action']} {f['side']} {f['count']} @ {f['price']}¢")
获取结算记录
def get_settlements(limit: int = 100):
"""GET /portfolio/settlements - 结算历史"""
r = requests.get(f"{BASE_URL}/portfolio/settlements",
headers=client._headers(),
params={"limit": limit})
r.raise_for_status()
return r.json().get("settlements", [])
settlements = get_settlements()
for s in settlements:
print(f"{s['ticker']}: 结算于 {s['settlement_value']}¢, 盈亏: ${s['revenue'] / 100:.2f}")
交易所状态
def get_exchange_status():
"""GET /exchange/status - 交易所操作状态"""
r = requests.get(f"{BASE_URL}/exchange/status", headers=client._headers())
r.raise_for_status()
return r.json()
status = get_exchange_status()
print(f"交易: {status.get('trading_active')}")
print(f"交易所开放: {status.get('exchange_active')}")
WebSocket(实时数据)
对于实时更新,在REST认证后使用WebSocket:
import websocket
import json
def on_message(ws, message):
data = json.loads(message)
print(f"更新: {data}")
def on_open(ws):
# 订阅订单簿更新
ws.send(json.dumps({
"type": "subscribe",
"channel": "orderbook",
"ticker": "INXD-24JAN10-T5805"
}))
# 用认证令牌连接
ws = websocket.WebSocketApp(
f"wss://trading-api.kalshi.com/trade-api/ws/v2?token={client.token}",
on_message=on_message,
on_open=on_open
)
ws.run_forever()
完整交易机器人
#!/usr/bin/env python3
"""
生产环境Kalshi交易机器人
"""
import os
import time
import requests
BASE_URL = "https://trading-api.kalshi.com/trade-api/v2"
class KalshiBot:
def __init__(self):
self.email = os.getenv("KALSHI_EMAIL")
self.password = os.getenv("KALSHI_PASSWORD")
self.token = None
self.token_expiry = 0
def _auth(self):
if time.time() > self.token_expiry - 60:
r = requests.post(f"{BASE_URL}/login", json={
"email": self.email, "password": self.password
})
r.raise_for_status()
self.token = r.json()["token"]
self.token_expiry = time.time() + 29 * 60
def _h(self):
self._auth()
return {"Authorization": f"Bearer {self.token}", "Content-Type": "application/json"}
def get_market(self, ticker):
r = requests.get(f"{BASE_URL}/markets/{ticker}", headers=self._h())
r.raise_for_status()
return r.json()["market"]
def get_positions(self):
r = requests.get(f"{BASE_URL}/portfolio/positions", headers=self._h())
r.raise_for_status()
return {p["ticker"]: p for p in r.json().get("market_positions", [])}
def get_balance(self):
r = requests.get(f"{BASE_URL}/portfolio/balance", headers=self._h())
r.raise_for_status()
return r.json().get("balance", 0) / 100
def buy(self, ticker, side, count, price):
payload = {
"ticker": ticker, "side": side, "action": "buy",
"count": count, "type": "limit",
"yes_price": price if side == "yes" else (100 - price)
}
r = requests.post(f"{BASE_URL}/portfolio/orders", headers=self._h(), json=payload)
return r.json() if r.status_code == 200 else {"error": r.text}
def sell(self, ticker, side, count, price):
payload = {
"ticker": ticker, "side": side, "action": "sell",
"count": count, "type": "limit",
"yes_price": price if side == "yes" else (100 - price)
}
r = requests.post(f"{BASE_URL}/portfolio/orders", headers=self._h(), json=payload)
return r.json() if r.status_code == 200 else {"error": r.text}
# 运行
bot = KalshiBot()
TICKER = "INXD-24JAN10-T5805"
while True:
try:
market = bot.get_market(TICKER)
positions = bot.get_positions()
balance = bot.get_balance()
yes_bid = market["yes_bid"]
yes_ask = market["yes_ask"]
pos = positions.get(TICKER, {}).get("position", 0)
print(f"余额: ${balance:.2f}, 持仓: {pos}, 价格: {yes_bid}/{yes_ask}")
# 交易逻辑
if yes_ask < 40 and pos < 10 and balance > 5:
print(f"买入 at {yes_ask}")
bot.buy(TICKER, "yes", 5, yes_ask)
elif yes_bid > 60 and pos > 0:
print(f"卖出 at {yes_bid}")
bot.sell(TICKER, "yes", pos, yes_bid)
time.sleep(10)
except Exception as e:
print(f"错误: {e}")
time.sleep(30)
热门市场系列
| 系列 | 描述 | 示例代码 |
|---|---|---|
| FED | 美联储利率决策 | FED-24MAR-T525 |
| INXD | 标普500每日收盘 | INXD-24JAN10-T5805 |
| KXBTC | 比特币价格区间 | KXBTC-24JAN-T45000 |
| KXETH | 以太坊价格 | KXETH-24JAN-T2500 |
| CPI | 通胀数据 | CPI-24JAN-T3.5 |
| GDP | GDP增长 | GDP-24Q1-T2.0 |
| NFP | 非农就业数据 | NFP-24JAN-T200K |
关键说明
- 价格以分为单位 - 45表示每合约0.45美元
- 合约若正确则支付1美元 - 成本为价格,利润为1美元 - 价格
- 无交易费用 - 仅价差重要
- 令牌30分钟后过期 - 在过期前自动刷新
- 仅限美国居民 - 需要KYC验证
- 存在速率限制 - 对429错误实施指数退避
- 最大持仓限制 - 随市场变化,检查市场详情
CLI包装器
用于快速CLI访问:
python trading/kalshi.py search "fed rate"
python trading/kalshi.py market <ticker>
python trading/kalshi.py buy <ticker> <side> <count> <price>
python trading/kalshi.py sell <ticker> <side> <count> <price>
python trading/kalshi.py positions
python trading/kalshi.py balance
python trading/kalshi.py orders
python trading/kalshi.py cancel <order_id>