name: 投资组合同步 description: “从Polymarket、Kalshi和Manifold同步投资组合头寸” emoji: “📁”
投资组合同步技能
从每个预测市场平台获取和同步头寸的实际方法。
Polymarket头寸同步
Polymarket头寸以Polygon上的ERC-1155代币形式持有。查询链上余额。
import os
import requests
WALLET = os.getenv("POLY_FUNDER_ADDRESS")
CTF_CONTRACT = "0x4D97DCd97eC945f40cF65F87097ACe5EA0476045" # 条件代币框架
def get_polymarket_positions(token_ids: list[str]) -> dict:
"""
获取特定代币ID的余额
参数:
token_ids: 要检查的代币ID列表(来自市场数据)
返回:
代币ID -> 份额余额的字典
"""
positions = {}
for token_id in token_ids:
token_int = int(token_id)
# ERC-1155 balanceOf调用
data = f"0x00fdd58e000000000000000000000000{WALLET[2:].lower()}{token_int:064x}"
r = requests.post("https://polygon-rpc.com/", json={
"jsonrpc": "2.0",
"method": "eth_call",
"params": [{"to": CTF_CONTRACT, "data": data}, "latest"],
"id": 1
})
result = r.json().get("result", "0x0")
balance = int(result, 16) / 1e6 # 原始值转换为份额
if balance > 0:
positions[token_id] = balance
return positions
# 示例: 检查BTC 15分钟市场的头寸
btc_tokens = [
"21742633143463906290569050155826241533067272736897614950488156847949938836455", # 是
"48331043336612883890938759509493159234755048973500640148014422747788308965745" # 否
]
positions = get_polymarket_positions(btc_tokens)
for token_id, balance in positions.items():
print(f"代币 {token_id[:20]}...: {balance} 份额")
获取所有Polymarket头寸(通过Gamma API)
def get_all_polymarket_positions(wallet: str):
"""通过Gamma API获取钱包的所有头寸"""
url = f"https://gamma-api.polymarket.com/positions?user={wallet.lower()}"
r = requests.get(url)
if r.status_code != 200:
return []
positions = r.json()
result = []
for p in positions:
result.append({
"market_id": p.get("conditionId"),
"market_question": p.get("title", "未知"),
"token_id": p.get("tokenId"),
"outcome": p.get("outcome"),
"size": float(p.get("size", 0)),
"avg_price": float(p.get("avgPrice", 0)),
"current_price": float(p.get("currentPrice", 0)),
"pnl": float(p.get("pnl", 0)),
"value": float(p.get("value", 0))
})
return result
positions = get_all_polymarket_positions(WALLET)
for p in positions:
print(f"{p['market_question'][:40]}")
print(f" {p['outcome']}: {p['size']} 份额 @ {p['avg_price']:.2f} -> {p['current_price']:.2f}")
print(f" 盈亏: ${p['pnl']:.2f}")
获取USDC余额
def get_usdc_balance(wallet: str) -> float:
"""获取Polygon上的USDC余额"""
USDC = "0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174" # Polygon上的USDC
# ERC-20 balanceOf
data = f"0x70a08231000000000000000000000000{wallet[2:].lower()}"
r = requests.post("https://polygon-rpc.com/", json={
"jsonrpc": "2.0",
"method": "eth_call",
"params": [{"to": USDC, "data": data}, "latest"],
"id": 1
})
result = r.json().get("result", "0x0")
balance = int(result, 16) / 1e6 # USDC有6位小数
return balance
usdc = get_usdc_balance(WALLET)
print(f"USDC余额: ${usdc:.2f}")
Kalshi头寸同步
import requests
import time
BASE_URL = "https://trading-api.kalshi.com/trade-api/v2"
class KalshiSync:
def __init__(self, email: str, password: str):
self.email = email
self.password = password
self.token = None
self.token_expiry = 0
def _auth(self):
if time.time() > self.token_expiry - 60:
r = requests.post(f"{BASE_URL}/login", json={
"email": self.email,
"password": self.password
})
r.raise_for_status()
self.token = r.json()["token"]
self.token_expiry = time.time() + 29 * 60
def _headers(self):
self._auth()
return {"Authorization": f"Bearer {self.token}"}
def get_positions(self):
"""获取所有Kalshi头寸"""
r = requests.get(f"{BASE_URL}/portfolio/positions", headers=self._headers())
r.raise_for_status()
positions = []
for p in r.json().get("market_positions", []):
# 获取市场详情
market = requests.get(
f"{BASE_URL}/markets/{p['ticker']}",
headers=self._headers()
).json().get("market", {})
positions.append({
"market_id": p["ticker"],
"market_question": market.get("title", p["ticker"]),
"side": "是" if p.get("position", 0) > 0 else "否",
"size": abs(p.get("position", 0)),
"avg_price": p.get("average_price", 0) / 100,
"current_price": market.get("yes_bid", 50) / 100,
"value": abs(p.get("position", 0)) * market.get("yes_bid", 50) / 100,
"pnl": p.get("realized_pnl", 0) / 100
})
return positions
def get_balance(self):
"""获取Kalshi余额"""
r = requests.get(f"{BASE_URL}/portfolio/balance", headers=self._headers())
r.raise_for_status()
data = r.json()
return {
"available": data.get("balance", 0) / 100,
"portfolio_value": data.get("portfolio_value", 0) / 100
}
# 使用示例
sync = KalshiSync(os.getenv("KALSHI_EMAIL"), os.getenv("KALSHI_PASSWORD"))
positions = sync.get_positions()
for p in positions:
print(f"{p['market_question'][:40]}")
print(f" {p['side']}: {p['size']} @ {p['avg_price']:.2f} -> {p['current_price']:.2f}")
balance = sync.get_balance()
print(f"
可用余额: ${balance['available']:.2f}")
print(f"投资组合价值: ${balance['portfolio_value']:.2f}")
Manifold头寸同步
import requests
API_URL = "https://api.manifold.markets/v0"
API_KEY = os.getenv("MANIFOLD_API_KEY")
def get_manifold_positions():
"""获取所有Manifold头寸"""
headers = {"Authorization": f"Key {API_KEY}"}
# 获取用户资料
r = requests.get(f"{API_URL}/me", headers=headers)
r.raise_for_status()
user = r.json()
user_id = user["id"]
balance = user.get("balance", 0)
# 获取所有投注
r = requests.get(f"{API_URL}/bets", headers=headers, params={"userId": user_id, "limit": 1000})
bets = r.json()
# 按市场聚合头寸
markets = {}
for bet in bets:
if bet.get("isSold") or bet.get("isCancelled"):
continue
mid = bet["contractId"]
if mid not in markets:
markets[mid] = {
"yes_shares": 0,
"no_shares": 0,
"invested": 0,
"question": bet.get("contractQuestion", "未知")
}
if bet["outcome"] == "YES":
markets[mid]["yes_shares"] += bet.get("shares", 0)
else:
markets[mid]["no_shares"] += bet.get("shares", 0)
markets[mid]["invested"] += bet["amount"]
# 获取当前价格
positions = []
for mid, data in markets.items():
if data["yes_shares"] == 0 and data["no_shares"] == 0:
continue
# 获取当前市场价格
r = requests.get(f"{API_URL}/market/{mid}")
if r.status_code == 200:
market = r.json()
prob = market.get("probability", 0.5)
yes_value = data["yes_shares"] * prob
no_value = data["no_shares"] * (1 - prob)
total_value = yes_value + no_value
pnl = total_value - data["invested"]
positions.append({
"market_id": mid,
"market_question": data["question"],
"yes_shares": data["yes_shares"],
"no_shares": data["no_shares"],
"invested": data["invested"],
"current_value": total_value,
"probability": prob,
"pnl": pnl,
"url": market.get("url", "")
})
return positions, balance
positions, balance = get_manifold_positions()
print(f"Mana余额: {balance}")
for p in positions:
print(f"
{p['market_question'][:50]}")
print(f" 是: {p['yes_shares']:.1f} 份额, 否: {p['no_shares']:.1f} 份额")
print(f" 价值: {p['current_value']:.0f}M, 盈亏: {p['pnl']:+.0f}M")
统一投资组合同步
#!/usr/bin/env python3
"""
从所有预测市场同步投资组合
"""
import os
from dataclasses import dataclass
from typing import List
@dataclass
class Position:
platform: str
market_id: str
market_question: str
side: str
size: float
avg_price: float
current_price: float
value: float
pnl: float
pnl_pct: float
def sync_all_portfolios() -> List[Position]:
"""从所有平台同步头寸"""
all_positions = []
# Polymarket
if os.getenv("POLY_FUNDER_ADDRESS"):
poly_positions = get_all_polymarket_positions(os.getenv("POLY_FUNDER_ADDRESS"))
for p in poly_positions:
avg = p["avg_price"] or 0.01
pnl_pct = ((p["current_price"] - avg) / avg * 100) if avg > 0 else 0
all_positions.append(Position(
platform="polymarket",
market_id=p["market_id"],
market_question=p["market_question"],
side=p["outcome"],
size=p["size"],
avg_price=avg,
current_price=p["current_price"],
value=p["value"],
pnl=p["pnl"],
pnl_pct=pnl_pct
))
# Kalshi
if os.getenv("KALSHI_EMAIL"):
kalshi = KalshiSync(os.getenv("KALSHI_EMAIL"), os.getenv("KALSHI_PASSWORD"))
kalshi_positions = kalshi.get_positions()
for p in kalshi_positions:
avg = p["avg_price"] or 0.01
pnl_pct = ((p["current_price"] - avg) / avg * 100) if avg > 0 else 0
all_positions.append(Position(
platform="kalshi",
market_id=p["market_id"],
market_question=p["market_question"],
side=p["side"],
size=p["size"],
avg_price=avg,
current_price=p["current_price"],
value=p["value"],
pnl=p["pnl"],
pnl_pct=pnl_pct
))
# Manifold
if os.getenv("MANIFOLD_API_KEY"):
mani_positions, _ = get_manifold_positions()
for p in mani_positions:
invested = p["invested"] or 1
pnl_pct = (p["pnl"] / invested * 100) if invested > 0 else 0
# 添加是头寸
if p["yes_shares"] > 0:
all_positions.append(Position(
platform="manifold",
market_id=p["market_id"],
market_question=p["market_question"],
side="是",
size=p["yes_shares"],
avg_price=0, # Manifold不跟踪此值
current_price=p["probability"],
value=p["yes_shares"] * p["probability"],
pnl=p["pnl"] / 2, # 分割盈亏
pnl_pct=pnl_pct
))
# 添加否头寸
if p["no_shares"] > 0:
all_positions.append(Position(
platform="manifold",
market_id=p["market_id"],
market_question=p["market_question"],
side="否",
size=p["no_shares"],
avg_price=0,
current_price=1 - p["probability"],
value=p["no_shares"] * (1 - p["probability"]),
pnl=p["pnl"] / 2,
pnl_pct=pnl_pct
))
return all_positions
# 运行同步
positions = sync_all_portfolios()
# 打印摘要
total_value = sum(p.value for p in positions)
total_pnl = sum(p.pnl for p in positions)
print(f"
{'='*60}")
print(f"投资组合摘要")
print(f"{'='*60}")
print(f"总价值: ${total_value:.2f}")
print(f"总盈亏: ${total_pnl:+.2f}")
print(f"{'='*60}")
for platform in ["polymarket", "kalshi", "manifold"]:
plat_positions = [p for p in positions if p.platform == platform]
if plat_positions:
plat_value = sum(p.value for p in plat_positions)
plat_pnl = sum(p.pnl for p in plat_positions)
print(f"
{platform.upper()}: ${plat_value:.2f} (盈亏: ${plat_pnl:+.2f})")
for p in plat_positions:
print(f" {p.market_question[:35]}")
print(f" {p.side}: {p.size:.1f} @ {p.avg_price:.2f} -> {p.current_price:.2f}")
print(f" 价值: ${p.value:.2f}, 盈亏: ${p.pnl:+.2f} ({p.pnl_pct:+.1f}%)")
自动同步的Cron任务
#!/usr/bin/env python3
"""
每小时运行以同步头寸到数据库
"""
import sqlite3
from datetime import datetime
def sync_to_db():
"""同步所有头寸到SQLite"""
conn = sqlite3.connect("~/.clodds/clodds.db")
positions = sync_all_portfolios()
for p in positions:
conn.execute("""
INSERT OR REPLACE INTO positions
(platform, market_id, market_question, side, size, avg_price, current_price, value, pnl, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
p.platform, p.market_id, p.market_question, p.side,
p.size, p.avg_price, p.current_price, p.value, p.pnl,
datetime.now().isoformat()
))
conn.commit()
conn.close()
print(f"在 {datetime.now()} 同步了 {len(positions)} 个头寸")
if __name__ == "__main__":
sync_to_db()
添加到crontab:
# 每小时同步一次
0 * * * * cd /path/to/clodds && python3 -c "from skills.portfolio_sync import sync_to_db; sync_to_db()"