name: 回测框架 description: 构建稳健的交易策略回测系统,妥善处理前瞻性偏差、幸存者偏差和交易成本。适用于开发交易算法、验证策略性能或构建回测基础设施。
回测框架
构建稳健、生产级的回测系统,避免常见陷阱,并提供可靠策略性能估计。
何时使用此技能
- 开发交易策略回测
- 构建回测基础设施
- 验证策略性能
- 避免常见回测偏差
- 实施前向分析
- 比较策略替代方案
核心概念
1. 回测偏差
| 偏差 | 描述 | 缓解措施 |
|---|---|---|
| 前瞻性 | 使用未来信息 | 点对点数据 |
| 幸存者 | 仅测试幸存证券 | 使用已退市证券 |
| 过拟合 | 对历史曲线拟合 | 样本外测试 |
| 选择 | 挑选策略 | 预注册 |
| 交易 | 忽略交易成本 | 现实成本模型 |
2. 正确回测结构
历史数据
│
▼
┌─────────────────────────────────────────┐
│ 训练集 │
│ (策略开发与优化) │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 验证集 │
│ (参数选择,无偷窥) │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 测试集 │
│ (最终性能评估) │
└─────────────────────────────────────────┘
3. 前向分析
窗口 1: [训练──────][测试]
窗口 2: [训练──────][测试]
窗口 3: [训练──────][测试]
窗口 4: [训练──────][测试]
─────▶ 时间
实现模式
模式 1: 事件驱动回测器
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import Dict, List, Optional
import pandas as pd
import numpy as np
class OrderSide(Enum):
BUY = "buy"
SELL = "sell"
class OrderType(Enum):
MARKET = "market"
LIMIT = "limit"
STOP = "stop"
@dataclass
class Order:
symbol: str
side: OrderSide
quantity: Decimal
order_type: OrderType
limit_price: Optional[Decimal] = None
stop_price: Optional[Decimal] = None
timestamp: Optional[datetime] = None
@dataclass
class Fill:
order: Order
fill_price: Decimal
fill_quantity: Decimal
commission: Decimal
slippage: Decimal
timestamp: datetime
@dataclass
class Position:
symbol: str
quantity: Decimal = Decimal("0")
avg_cost: Decimal = Decimal("0")
realized_pnl: Decimal = Decimal("0")
def update(self, fill: Fill) -> None:
if fill.order.side == OrderSide.BUY:
new_quantity = self.quantity + fill.fill_quantity
if new_quantity != 0:
self.avg_cost = (
(self.quantity * self.avg_cost + fill.fill_quantity * fill.fill_price)
/ new_quantity
)
self.quantity = new_quantity
else:
self.realized_pnl += fill.fill_quantity * (fill.fill_price - self.avg_cost)
self.quantity -= fill.fill_quantity
@dataclass
class Portfolio:
cash: Decimal
positions: Dict[str, Position] = field(default_factory=dict)
def get_position(self, symbol: str) -> Position:
if symbol not in self.positions:
self.positions[symbol] = Position(symbol=symbol)
return self.positions[symbol]
def process_fill(self, fill: Fill) -> None:
position = self.get_position(fill.order.symbol)
position.update(fill)
if fill.order.side == OrderSide.BUY:
self.cash -= fill.fill_price * fill.fill_quantity + fill.commission
else:
self.cash += fill.fill_price * fill.fill_quantity - fill.commission
def get_equity(self, prices: Dict[str, Decimal]) -> Decimal:
equity = self.cash
for symbol, position in self.positions.items():
if position.quantity != 0 and symbol in prices:
equity += position.quantity * prices[symbol]
return equity
class Strategy(ABC):
@abstractmethod
def on_bar(self, timestamp: datetime, data: pd.DataFrame) -> List[Order]:
pass
@abstractmethod
def on_fill(self, fill: Fill) -> None:
pass
class ExecutionModel(ABC):
@abstractmethod
def execute(self, order: Order, bar: pd.Series) -> Optional[Fill]:
pass
class SimpleExecutionModel(ExecutionModel):
def __init__(self, slippage_bps: float = 10, commission_per_share: float = 0.01):
self.slippage_bps = slippage_bps
self.commission_per_share = commission_per_share
def execute(self, order: Order, bar: pd.Series) -> Optional[Fill]:
if order.order_type == OrderType.MARKET:
base_price = Decimal(str(bar["open"]))
# 应用滑点
slippage_mult = 1 + (self.slippage_bps / 10000)
if order.side == OrderSide.BUY:
fill_price = base_price * Decimal(str(slippage_mult))
else:
fill_price = base_price / Decimal(str(slippage_mult))
commission = order.quantity * Decimal(str(self.commission_per_share))
slippage = abs(fill_price - base_price) * order.quantity
return Fill(
order=order,
fill_price=fill_price,
fill_quantity=order.quantity,
commission=commission,
slippage=slippage,
timestamp=bar.name
)
return None
class Backtester:
def __init__(
self,
strategy: Strategy,
execution_model: ExecutionModel,
initial_capital: Decimal = Decimal("100000")
):
self.strategy = strategy
self.execution_model = execution_model
self.portfolio = Portfolio(cash=initial_capital)
self.equity_curve: List[tuple] = []
self.trades: List[Fill] = []
def run(self, data: pd.DataFrame) -> pd.DataFrame:
"""在具有DatetimeIndex的OHLCV数据上运行回测。"""
pending_orders: List[Order] = []
for timestamp, bar in data.iterrows():
# 以今日价格执行待处理订单
for order in pending_orders:
fill = self.execution_model.execute(order, bar)
if fill:
self.portfolio.process_fill(fill)
self.strategy.on_fill(fill)
self.trades.append(fill)
pending_orders.clear()
# 获取当前价格以计算权益
prices = {data.index.name or "default": Decimal(str(bar["close"]))}
equity = self.portfolio.get_equity(prices)
self.equity_curve.append((timestamp, float(equity)))
# 为下一个条形生成新订单
new_orders = self.strategy.on_bar(timestamp, data.loc[:timestamp])
pending_orders.extend(new_orders)
return self._create_results()
def _create_results(self) -> pd.DataFrame:
equity_df = pd.DataFrame(self.equity_curve, columns=["timestamp", "equity"])
equity_df.set_index("timestamp", inplace=True)
equity_df["returns"] = equity_df["equity"].pct_change()
return equity_df
模式 2: 向量化回测器(快速)
import pandas as pd
import numpy as np
from typing import Callable, Dict, Any
class VectorizedBacktester:
"""用于简单策略的快速向量化回测器。"""
def __init__(
self,
initial_capital: float = 100000,
commission: float = 0.001, # 0.1%
slippage: float = 0.0005 # 0.05%
):
self.initial_capital = initial_capital
self.commission = commission
self.slippage = slippage
def run(
self,
prices: pd.DataFrame,
signal_func: Callable[[pd.DataFrame], pd.Series]
) -> Dict[str, Any]:
"""
使用信号函数运行回测。
参数:
prices: 包含'close'列的DataFrame
signal_func: 返回位置信号(-1, 0, 1)的函数
返回:
包含结果的字典
"""
# 生成信号(移位以避免前瞻性)
signals = signal_func(prices).shift(1).fillna(0)
# 计算收益
returns = prices["close"].pct_change()
# 计算带成本的策略收益
position_changes = signals.diff().abs()
trading_costs = position_changes * (self.commission + self.slippage)
strategy_returns = signals * returns - trading_costs
# 构建权益曲线
equity = (1 + strategy_returns).cumprod() * self.initial_capital
# 计算指标
results = {
"equity": equity,
"returns": strategy_returns,
"signals": signals,
"metrics": self._calculate_metrics(strategy_returns, equity)
}
return results
def _calculate_metrics(
self,
returns: pd.Series,
equity: pd.Series
) -> Dict[str, float]:
"""计算性能指标。"""
total_return = (equity.iloc[-1] / self.initial_capital) - 1
annual_return = (1 + total_return) ** (252 / len(returns)) - 1
annual_vol = returns.std() * np.sqrt(252)
sharpe = annual_return / annual_vol if annual_vol > 0 else 0
# 回撤
rolling_max = equity.cummax()
drawdown = (equity - rolling_max) / rolling_max
max_drawdown = drawdown.min()
# 胜率
winning_days = (returns > 0).sum()
total_days = (returns != 0).sum()
win_rate = winning_days / total_days if total_days > 0 else 0
return {
"total_return": total_return,
"annual_return": annual_return,
"annual_volatility": annual_vol,
"sharpe_ratio": sharpe,
"max_drawdown": max_drawdown,
"win_rate": win_rate,
"num_trades": int((returns != 0).sum())
}
# 示例用法
def momentum_signal(prices: pd.DataFrame, lookback: int = 20) -> pd.Series:
"""简单动量策略:当价格>SMA时做多,否则平仓。"""
sma = prices["close"].rolling(lookback).mean()
return (prices["close"] > sma).astype(int)
# 运行回测
# backtester = VectorizedBacktester()
# results = backtester.run(price_data, lambda p: momentum_signal(p, 50))
模式 3: 前向优化
from typing import Callable, Dict, List, Tuple, Any
import pandas as pd
import numpy as np
from itertools import product
class WalkForwardOptimizer:
"""使用锚定或滚动窗口进行前向分析。"""
def __init__(
self,
train_period: int,
test_period: int,
anchored: bool = False,
n_splits: int = None
):
"""
参数:
train_period: 训练窗口中的条形数
test_period: 测试窗口中的条形数
anchored: 如果为True,训练始终从开始处开始
n_splits: 训练/测试分割数(如果为None则自动计算)
"""
self.train_period = train_period
self.test_period = test_period
self.anchored = anchored
self.n_splits = n_splits
def generate_splits(
self,
data: pd.DataFrame
) -> List[Tuple[pd.DataFrame, pd.DataFrame]]:
"""生成训练/测试分割。"""
splits = []
n = len(data)
if self.n_splits:
step = (n - self.train_period) // self.n_splits
else:
step = self.test_period
start = 0
while start + self.train_period + self.test_period <= n:
if self.anchored:
train_start = 0
else:
train_start = start
train_end = start + self.train_period
test_end = min(train_end + self.test_period, n)
train_data = data.iloc[train_start:train_end]
test_data = data.iloc[train_end:test_end]
splits.append((train_data, test_data))
start += step
return splits
def optimize(
self,
data: pd.DataFrame,
strategy_func: Callable,
param_grid: Dict[str, List],
metric: str = "sharpe_ratio"
) -> Dict[str, Any]:
"""
运行前向优化。
参数:
data: 完整数据集
strategy_func: 函数(data, **params) -> 结果字典
param_grid: 要测试的参数组合
metric: 要优化的指标
返回:
所有测试时期的组合结果
"""
splits = self.generate_splits(data)
all_results = []
optimal_params_history = []
for i, (train_data, test_data) in enumerate(splits):
# 在训练数据上优化
best_params, best_metric = self._grid_search(
train_data, strategy_func, param_grid, metric
)
optimal_params_history.append(best_params)
# 使用最优参数测试
test_results = strategy_func(test_data, **best_params)
test_results["split"] = i
test_results["params"] = best_params
all_results.append(test_results)
print(f"分割 {i+1}/{len(splits)}: "
f"最佳 {metric}={best_metric:.4f}, 参数={best_params}")
return {
"split_results": all_results,
"param_history": optimal_params_history,
"combined_equity": self._combine_equity_curves(all_results)
}
def _grid_search(
self,
data: pd.DataFrame,
strategy_func: Callable,
param_grid: Dict[str, List],
metric: str
) -> Tuple[Dict, float]:
"""网格搜索最佳参数。"""
best_params = None
best_metric = -np.inf
# 生成所有参数组合
param_names = list(param_grid.keys())
param_values = list(param_grid.values())
for values in product(*param_values):
params = dict(zip(param_names, values))
results = strategy_func(data, **params)
if results["metrics"][metric] > best_metric:
best_metric = results["metrics"][metric]
best_params = params
return best_params, best_metric
def _combine_equity_curves(
self,
results: List[Dict]
) -> pd.Series:
"""合并所有测试时期的权益曲线。"""
combined = pd.concat([r["equity"] for r in results])
return combined
模式 4: 蒙特卡洛分析
import numpy as np
import pandas as pd
from typing import Dict, List
class MonteCarloAnalyzer:
"""用于策略稳健性的蒙特卡洛模拟。"""
def __init__(self, n_simulations: int = 1000, confidence: float = 0.95):
self.n_simulations = n_simulations
self.confidence = confidence
def bootstrap_returns(
self,
returns: pd.Series,
n_periods: int = None
) -> np.ndarray:
"""
通过重采样收益进行自助模拟。
参数:
returns: 历史收益序列
n_periods: 每个模拟的长度(默认:与输入相同)
返回:
形状为(n_simulations, n_periods)的数组
"""
if n_periods is None:
n_periods = len(returns)
simulations = np.zeros((self.n_simulations, n_periods))
for i in range(self.n_simulations):
# 有放回重采样
simulated_returns = np.random.choice(
returns.values,
size=n_periods,
replace=True
)
simulations[i] = simulated_returns
return simulations
def analyze_drawdowns(
self,
returns: pd.Series
) -> Dict[str, float]:
"""通过模拟分析回撤分布。"""
simulations = self.bootstrap_returns(returns)
max_drawdowns = []
for sim_returns in simulations:
equity = (1 + sim_returns).cumprod()
rolling_max = np.maximum.accumulate(equity)
drawdowns = (equity - rolling_max) / rolling_max
max_drawdowns.append(drawdowns.min())
max_drawdowns = np.array(max_drawdowns)
return {
"expected_max_dd": np.mean(max_drawdowns),
"median_max_dd": np.median(max_drawdowns),
f"worst_{int(self.confidence*100)}pct": np.percentile(
max_drawdowns, (1 - self.confidence) * 100
),
"worst_case": max_drawdowns.min()
}
def probability_of_loss(
self,
returns: pd.Series,
holding_periods: List[int] = [21, 63, 126, 252]
) -> Dict[int, float]:
"""计算不同持有期下的亏损概率。"""
results = {}
for period in holding_periods:
if period > len(returns):
continue
simulations = self.bootstrap_returns(returns, period)
total_returns = (1 + simulations).prod(axis=1) - 1
prob_loss = (total_returns < 0).mean()
results[period] = prob_loss
return results
def confidence_interval(
self,
returns: pd.Series,
periods: int = 252
) -> Dict[str, float]:
"""计算未来收益的置信区间。"""
simulations = self.bootstrap_returns(returns, periods)
total_returns = (1 + simulations).prod(axis=1) - 1
lower = (1 - self.confidence) / 2
upper = 1 - lower
return {
"expected": total_returns.mean(),
"lower_bound": np.percentile(total_returns, lower * 100),
"upper_bound": np.percentile(total_returns, upper * 100),
"std": total_returns.std()
}
性能指标
def calculate_metrics(returns: pd.Series, rf_rate: float = 0.02) -> Dict[str, float]:
"""计算全面的性能指标。"""
# 年化因子(假设日收益)
ann_factor = 252
# 基本指标
total_return = (1 + returns).prod() - 1
annual_return = (1 + total_return) ** (ann_factor / len(returns)) - 1
annual_vol = returns.std() * np.sqrt(ann_factor)
# 风险调整收益
sharpe = (annual_return - rf_rate) / annual_vol if annual_vol > 0 else 0
# 索提诺(下行偏差)
downside_returns = returns[returns < 0]
downside_vol = downside_returns.std() * np.sqrt(ann_factor)
sortino = (annual_return - rf_rate) / downside_vol if downside_vol > 0 else 0
# 卡尔马比率
equity = (1 + returns).cumprod()
rolling_max = equity.cummax()
drawdowns = (equity - rolling_max) / rolling_max
max_drawdown = drawdowns.min()
calmar = annual_return / abs(max_drawdown) if max_drawdown != 0 else 0
# 胜率和盈亏比
wins = returns[returns > 0]
losses = returns[returns < 0]
win_rate = len(wins) / len(returns[returns != 0]) if len(returns[returns != 0]) > 0 else 0
profit_factor = wins.sum() / abs(losses.sum()) if losses.sum() != 0 else np.inf
return {
"total_return": total_return,
"annual_return": annual_return,
"annual_volatility": annual_vol,
"sharpe_ratio": sharpe,
"sortino_ratio": sortino,
"calmar_ratio": calmar,
"max_drawdown": max_drawdown,
"win_rate": win_rate,
"profit_factor": profit_factor,
"num_trades": int((returns != 0).sum())
}
最佳实践
应做事项
- 使用点对点数据 - 避免前瞻性偏差
- 包含交易成本 - 现实估计
- 测试样本外 - 始终保留数据
- 使用前向分析 - 不仅是训练/测试
- 蒙特卡洛分析 - 理解不确定性
禁忌事项
- 不要过拟合 - 限制参数
- 不要忽略幸存者偏差 - 包括已退市
- 不要随意使用调整数据 - 理解调整
- 不要在完整历史上优化 - 保留测试集
- 不要忽略容量 - 市场影响重要