投资组合同步技能Skill portfolio-sync

此技能提供完整的Python解决方案,用于从Polymarket、Kalshi和Manifold等预测市场平台同步投资组合头寸。它包括API调用、数据获取、统一同步和自动定时任务,适用于量化交易者、数据分析师和金融科技开发者进行投资管理和风险监控。关键词:投资组合同步、预测市场、量化交易、Python编程、API集成、数据获取、自动同步、金融科技。

量化策略 1 次安装 2 次浏览 更新于 3/9/2026

name: 投资组合同步 description: “从Polymarket、Kalshi和Manifold同步投资组合头寸” emoji: “📁”

投资组合同步技能

从每个预测市场平台获取和同步头寸的实际方法。

Polymarket头寸同步

Polymarket头寸以Polygon上的ERC-1155代币形式持有。查询链上余额。

import os
import requests

WALLET = os.getenv("POLY_FUNDER_ADDRESS")
CTF_CONTRACT = "0x4D97DCd97eC945f40cF65F87097ACe5EA0476045"  # 条件代币框架

def get_polymarket_positions(token_ids: list[str]) -> dict:
    """
    获取特定代币ID的余额

    参数:
        token_ids: 要检查的代币ID列表(来自市场数据)

    返回:
        代币ID -> 份额余额的字典
    """
    positions = {}

    for token_id in token_ids:
        token_int = int(token_id)

        # ERC-1155 balanceOf调用
        data = f"0x00fdd58e000000000000000000000000{WALLET[2:].lower()}{token_int:064x}"

        r = requests.post("https://polygon-rpc.com/", json={
            "jsonrpc": "2.0",
            "method": "eth_call",
            "params": [{"to": CTF_CONTRACT, "data": data}, "latest"],
            "id": 1
        })

        result = r.json().get("result", "0x0")
        balance = int(result, 16) / 1e6  # 原始值转换为份额

        if balance > 0:
            positions[token_id] = balance

    return positions

# 示例: 检查BTC 15分钟市场的头寸
btc_tokens = [
    "21742633143463906290569050155826241533067272736897614950488156847949938836455",  # 是
    "48331043336612883890938759509493159234755048973500640148014422747788308965745"   # 否
]

positions = get_polymarket_positions(btc_tokens)
for token_id, balance in positions.items():
    print(f"代币 {token_id[:20]}...: {balance} 份额")

获取所有Polymarket头寸(通过Gamma API)

def get_all_polymarket_positions(wallet: str):
    """通过Gamma API获取钱包的所有头寸"""
    url = f"https://gamma-api.polymarket.com/positions?user={wallet.lower()}"
    r = requests.get(url)

    if r.status_code != 200:
        return []

    positions = r.json()

    result = []
    for p in positions:
        result.append({
            "market_id": p.get("conditionId"),
            "market_question": p.get("title", "未知"),
            "token_id": p.get("tokenId"),
            "outcome": p.get("outcome"),
            "size": float(p.get("size", 0)),
            "avg_price": float(p.get("avgPrice", 0)),
            "current_price": float(p.get("currentPrice", 0)),
            "pnl": float(p.get("pnl", 0)),
            "value": float(p.get("value", 0))
        })

    return result

positions = get_all_polymarket_positions(WALLET)
for p in positions:
    print(f"{p['market_question'][:40]}")
    print(f"  {p['outcome']}: {p['size']} 份额 @ {p['avg_price']:.2f} -> {p['current_price']:.2f}")
    print(f"  盈亏: ${p['pnl']:.2f}")

获取USDC余额

def get_usdc_balance(wallet: str) -> float:
    """获取Polygon上的USDC余额"""
    USDC = "0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174"  # Polygon上的USDC

    # ERC-20 balanceOf
    data = f"0x70a08231000000000000000000000000{wallet[2:].lower()}"

    r = requests.post("https://polygon-rpc.com/", json={
        "jsonrpc": "2.0",
        "method": "eth_call",
        "params": [{"to": USDC, "data": data}, "latest"],
        "id": 1
    })

    result = r.json().get("result", "0x0")
    balance = int(result, 16) / 1e6  # USDC有6位小数

    return balance

usdc = get_usdc_balance(WALLET)
print(f"USDC余额: ${usdc:.2f}")

Kalshi头寸同步

import requests
import time

BASE_URL = "https://trading-api.kalshi.com/trade-api/v2"

class KalshiSync:
    def __init__(self, email: str, password: str):
        self.email = email
        self.password = password
        self.token = None
        self.token_expiry = 0

    def _auth(self):
        if time.time() > self.token_expiry - 60:
            r = requests.post(f"{BASE_URL}/login", json={
                "email": self.email,
                "password": self.password
            })
            r.raise_for_status()
            self.token = r.json()["token"]
            self.token_expiry = time.time() + 29 * 60

    def _headers(self):
        self._auth()
        return {"Authorization": f"Bearer {self.token}"}

    def get_positions(self):
        """获取所有Kalshi头寸"""
        r = requests.get(f"{BASE_URL}/portfolio/positions", headers=self._headers())
        r.raise_for_status()

        positions = []
        for p in r.json().get("market_positions", []):
            # 获取市场详情
            market = requests.get(
                f"{BASE_URL}/markets/{p['ticker']}",
                headers=self._headers()
            ).json().get("market", {})

            positions.append({
                "market_id": p["ticker"],
                "market_question": market.get("title", p["ticker"]),
                "side": "是" if p.get("position", 0) > 0 else "否",
                "size": abs(p.get("position", 0)),
                "avg_price": p.get("average_price", 0) / 100,
                "current_price": market.get("yes_bid", 50) / 100,
                "value": abs(p.get("position", 0)) * market.get("yes_bid", 50) / 100,
                "pnl": p.get("realized_pnl", 0) / 100
            })

        return positions

    def get_balance(self):
        """获取Kalshi余额"""
        r = requests.get(f"{BASE_URL}/portfolio/balance", headers=self._headers())
        r.raise_for_status()
        data = r.json()
        return {
            "available": data.get("balance", 0) / 100,
            "portfolio_value": data.get("portfolio_value", 0) / 100
        }

# 使用示例
sync = KalshiSync(os.getenv("KALSHI_EMAIL"), os.getenv("KALSHI_PASSWORD"))

positions = sync.get_positions()
for p in positions:
    print(f"{p['market_question'][:40]}")
    print(f"  {p['side']}: {p['size']} @ {p['avg_price']:.2f} -> {p['current_price']:.2f}")

balance = sync.get_balance()
print(f"
可用余额: ${balance['available']:.2f}")
print(f"投资组合价值: ${balance['portfolio_value']:.2f}")

Manifold头寸同步

import requests

API_URL = "https://api.manifold.markets/v0"
API_KEY = os.getenv("MANIFOLD_API_KEY")

def get_manifold_positions():
    """获取所有Manifold头寸"""
    headers = {"Authorization": f"Key {API_KEY}"}

    # 获取用户资料
    r = requests.get(f"{API_URL}/me", headers=headers)
    r.raise_for_status()
    user = r.json()
    user_id = user["id"]
    balance = user.get("balance", 0)

    # 获取所有投注
    r = requests.get(f"{API_URL}/bets", headers=headers, params={"userId": user_id, "limit": 1000})
    bets = r.json()

    # 按市场聚合头寸
    markets = {}
    for bet in bets:
        if bet.get("isSold") or bet.get("isCancelled"):
            continue

        mid = bet["contractId"]
        if mid not in markets:
            markets[mid] = {
                "yes_shares": 0,
                "no_shares": 0,
                "invested": 0,
                "question": bet.get("contractQuestion", "未知")
            }

        if bet["outcome"] == "YES":
            markets[mid]["yes_shares"] += bet.get("shares", 0)
        else:
            markets[mid]["no_shares"] += bet.get("shares", 0)

        markets[mid]["invested"] += bet["amount"]

    # 获取当前价格
    positions = []
    for mid, data in markets.items():
        if data["yes_shares"] == 0 and data["no_shares"] == 0:
            continue

        # 获取当前市场价格
        r = requests.get(f"{API_URL}/market/{mid}")
        if r.status_code == 200:
            market = r.json()
            prob = market.get("probability", 0.5)

            yes_value = data["yes_shares"] * prob
            no_value = data["no_shares"] * (1 - prob)
            total_value = yes_value + no_value
            pnl = total_value - data["invested"]

            positions.append({
                "market_id": mid,
                "market_question": data["question"],
                "yes_shares": data["yes_shares"],
                "no_shares": data["no_shares"],
                "invested": data["invested"],
                "current_value": total_value,
                "probability": prob,
                "pnl": pnl,
                "url": market.get("url", "")
            })

    return positions, balance

positions, balance = get_manifold_positions()
print(f"Mana余额: {balance}")

for p in positions:
    print(f"
{p['market_question'][:50]}")
    print(f"  是: {p['yes_shares']:.1f} 份额, 否: {p['no_shares']:.1f} 份额")
    print(f"  价值: {p['current_value']:.0f}M, 盈亏: {p['pnl']:+.0f}M")

统一投资组合同步

#!/usr/bin/env python3
"""
从所有预测市场同步投资组合
"""

import os
from dataclasses import dataclass
from typing import List

@dataclass
class Position:
    platform: str
    market_id: str
    market_question: str
    side: str
    size: float
    avg_price: float
    current_price: float
    value: float
    pnl: float
    pnl_pct: float

def sync_all_portfolios() -> List[Position]:
    """从所有平台同步头寸"""
    all_positions = []

    # Polymarket
    if os.getenv("POLY_FUNDER_ADDRESS"):
        poly_positions = get_all_polymarket_positions(os.getenv("POLY_FUNDER_ADDRESS"))
        for p in poly_positions:
            avg = p["avg_price"] or 0.01
            pnl_pct = ((p["current_price"] - avg) / avg * 100) if avg > 0 else 0

            all_positions.append(Position(
                platform="polymarket",
                market_id=p["market_id"],
                market_question=p["market_question"],
                side=p["outcome"],
                size=p["size"],
                avg_price=avg,
                current_price=p["current_price"],
                value=p["value"],
                pnl=p["pnl"],
                pnl_pct=pnl_pct
            ))

    # Kalshi
    if os.getenv("KALSHI_EMAIL"):
        kalshi = KalshiSync(os.getenv("KALSHI_EMAIL"), os.getenv("KALSHI_PASSWORD"))
        kalshi_positions = kalshi.get_positions()
        for p in kalshi_positions:
            avg = p["avg_price"] or 0.01
            pnl_pct = ((p["current_price"] - avg) / avg * 100) if avg > 0 else 0

            all_positions.append(Position(
                platform="kalshi",
                market_id=p["market_id"],
                market_question=p["market_question"],
                side=p["side"],
                size=p["size"],
                avg_price=avg,
                current_price=p["current_price"],
                value=p["value"],
                pnl=p["pnl"],
                pnl_pct=pnl_pct
            ))

    # Manifold
    if os.getenv("MANIFOLD_API_KEY"):
        mani_positions, _ = get_manifold_positions()
        for p in mani_positions:
            invested = p["invested"] or 1
            pnl_pct = (p["pnl"] / invested * 100) if invested > 0 else 0

            # 添加是头寸
            if p["yes_shares"] > 0:
                all_positions.append(Position(
                    platform="manifold",
                    market_id=p["market_id"],
                    market_question=p["market_question"],
                    side="是",
                    size=p["yes_shares"],
                    avg_price=0,  # Manifold不跟踪此值
                    current_price=p["probability"],
                    value=p["yes_shares"] * p["probability"],
                    pnl=p["pnl"] / 2,  # 分割盈亏
                    pnl_pct=pnl_pct
                ))

            # 添加否头寸
            if p["no_shares"] > 0:
                all_positions.append(Position(
                    platform="manifold",
                    market_id=p["market_id"],
                    market_question=p["market_question"],
                    side="否",
                    size=p["no_shares"],
                    avg_price=0,
                    current_price=1 - p["probability"],
                    value=p["no_shares"] * (1 - p["probability"]),
                    pnl=p["pnl"] / 2,
                    pnl_pct=pnl_pct
                ))

    return all_positions

# 运行同步
positions = sync_all_portfolios()

# 打印摘要
total_value = sum(p.value for p in positions)
total_pnl = sum(p.pnl for p in positions)

print(f"
{'='*60}")
print(f"投资组合摘要")
print(f"{'='*60}")
print(f"总价值: ${total_value:.2f}")
print(f"总盈亏: ${total_pnl:+.2f}")
print(f"{'='*60}")

for platform in ["polymarket", "kalshi", "manifold"]:
    plat_positions = [p for p in positions if p.platform == platform]
    if plat_positions:
        plat_value = sum(p.value for p in plat_positions)
        plat_pnl = sum(p.pnl for p in plat_positions)
        print(f"
{platform.upper()}: ${plat_value:.2f} (盈亏: ${plat_pnl:+.2f})")

        for p in plat_positions:
            print(f"  {p.market_question[:35]}")
            print(f"    {p.side}: {p.size:.1f} @ {p.avg_price:.2f} -> {p.current_price:.2f}")
            print(f"    价值: ${p.value:.2f}, 盈亏: ${p.pnl:+.2f} ({p.pnl_pct:+.1f}%)")

自动同步的Cron任务

#!/usr/bin/env python3
"""
每小时运行以同步头寸到数据库
"""

import sqlite3
from datetime import datetime

def sync_to_db():
    """同步所有头寸到SQLite"""
    conn = sqlite3.connect("~/.clodds/clodds.db")
    positions = sync_all_portfolios()

    for p in positions:
        conn.execute("""
            INSERT OR REPLACE INTO positions
            (platform, market_id, market_question, side, size, avg_price, current_price, value, pnl, updated_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            p.platform, p.market_id, p.market_question, p.side,
            p.size, p.avg_price, p.current_price, p.value, p.pnl,
            datetime.now().isoformat()
        ))

    conn.commit()
    conn.close()
    print(f"在 {datetime.now()} 同步了 {len(positions)} 个头寸")

if __name__ == "__main__":
    sync_to_db()

添加到crontab:

# 每小时同步一次
0 * * * * cd /path/to/clodds && python3 -c "from skills.portfolio_sync import sync_to_db; sync_to_db()"