当回测说"年化 80%"时,你在为什么付出代价?
"策略在回测里赚钱,在实盘里亏钱"——这不是策略的错,是你看不见回测框架的假设边界。
每一个量化交易者迟早会遇到这个时刻:对着历史数据跑出的漂亮曲线信心满满,结果一上实盘就被打得满地找牙。有人怪市场变了,有人怪滑点太狠,但真正的问题往往藏在回测框架的底层假设里——你用的是什么引擎,它牺牲了什么来换取速度?
回测引擎主要分为两大流派:向量化回测和事件驱动回测。前者代码简洁、速度快,是大多数回测框架的选择;后者实现复杂,但能模拟更真实的市场微观结构。今天,我们从零开始,设计一个事件驱动回测引擎。
一、向量化回测的三个致命假设
在动手之前,先理解为什么我们需要事件驱动。
向量化回测的典型代码是这样的:
signals = (
pd.DataFrame({'price': prices})
.assign(sma20=lambda x: x['price'].rolling(20).mean())
.assign(sma50=lambda x: x['price'].rolling(50).mean())
.assign(position=lambda x: np.where(x['sma20'] > x['sma50'], 1, -1))
)
returns = signals['position'].shift(1) * signals['price'].pct_change()
这段代码优雅、简洁,一行 returns = ... 就完成了整个回测。但优雅是有代价的:
假设一:信号与成交无缝衔接。 代码中 shift(1) 假设你能以第二天开盘价成交。但实盘中,订单从发出到成交有延迟——可能是 100ms,可能是 5 秒。向量化回测无法模拟这种延迟带来的影响。
假设二:成交价就是报价。 returns = position.shift(1) * price.pct_change() 假设你能以当根 K 线的收盘价成交,且成交数量对价格没有冲击。但实盘中,大单会冲击市场,你买的时候价格已经在涨了。
假设三:数据采样代表真实行情。 如果你用日线数据做回测,框架假设整天只有一个价格序列。但真实的盘中可能有无数次价格穿越你的止损线又收回来,你永远不会知道——因为你只看日线。
这三个假设叠加在一起,就是"回测过度拟合"的根源:你的策略在人工构造的、无摩擦的、无延迟的市场里运行,怎么可能不出错?
二、事件驱动框架的核心思想
事件驱动回测的核心只有一个:时间是真实的,事件是顺序的。
在向量化框架里,所有时间点的数据同时存在,你可以随意索引。但在事件驱动框架里,时间是不可逆的——你只能看见当前时刻及之前的事件,不能"偷看"未来。
事件驱动回测的典型循环:
while not end_of_data:
event = event_queue.get_next_event() # 获取下一个时间戳的事件
if event.type == EventType.MARKET_DATA:
strategy.on_market_data(event) # 策略接收数据,产生信号
for order in strategy.orders:
risk_manager.check(order) # 风控检查
order_router.submit(order) # 提交订单
elif event.type == EventType.ORDER_UPDATE:
broker.process_update(event) # 处理订单状态变更
elif event.type == EventType.FILL:
portfolio.update(event) # 更新持仓和资金
event_queue.pop() # 事件出队
这个循环的关键在于:每个时间戳,事件依次流转。你无法在处理 FILL 事件时"顺带"看看下一根 K 线是什么。这意味着你必须在设计框架时就想清楚:每个事件类型需要什么信息,谁负责产生,谁负责消费。
三、事件系统设计:事件类型与层次结构
事件驱动框架的第一个组件是事件类型定义。
from dataclasses import dataclass
from enum import Enum, auto
from datetime import datetime
from typing import Optional, Any
class EventType(Enum):
"""事件类型枚举"""
MARKET_DATA = auto() # 市场数据事件
SIGNAL = auto() # 策略信号事件
ORDER_NEW = auto() # 新订单提交
ORDER_SUBMITTED = auto() # 订单已提交至交易所
ORDER_PARTIAL_FILL = auto() # 部分成交
ORDER_FILLED = auto() # 全部成交
ORDER_CANCELLED = auto() # 订单撤销
ORDER_REJECTED = auto() # 订单被拒绝
POSITION_UPDATE = auto() # 持仓更新
CASH_UPDATE = auto() # 资金更新
@dataclass
class Event:
"""事件基类"""
timestamp: datetime
type: EventType
data: Any = None
def __repr__(self):
return f"<Event {self.timestamp} {self.type.name}>"
@dataclass
class MarketDataEvent(Event):
"""市场数据事件"""
symbol: str
open: float
high: float
low: float
close: float
volume: float
def __post_init__(self):
self.type = EventType.MARKET_DATA
这是一个扁平的事件结构,每个事件包含时间戳、类型和数据。在更复杂的框架中,你可能会引入事件层次——比如把 ORDER_FILLED 作为 ORDER_UPDATE 的子类,或者引入事件链来追踪订单从提交到成交的完整生命周期。
但对于大多数回测场景,扁平结构足够用了。
四、撮合引擎:订单簿的设计与撮合算法
撮合引擎是事件驱动回测的核心。没有它,你的订单永远只是"想法";有了它,想法才能变成"成交"。
撮合引擎需要模拟交易所的撮合逻辑:价格优先,时间优先。价格相同的情况下,先提交的订单先成交。
from dataclasses import dataclass, field
from sortedcontainers import SortedList
from enum import Enum
from datetime import datetime
from typing import Dict, List, Optional
from collections import namedtuple
class OrderSide(Enum):
BUY = "BUY"
SELL = "SELL"
class OrderType(Enum):
LIMIT = "LIMIT"
MARKET = "MARKET"
class OrderStatus(Enum):
PENDING = "PENDING"
SUBMITTED = "SUBMITTED"
PARTIAL_FILLED = "PARTIAL_FILLED"
FILLED = "FILLED"
CANCELLED = "CANCELLED"
REJECTED = "REJECTED"
Order = namedtuple('Order', ['id', 'symbol', 'side', 'price', 'quantity', 'timestamp'])
class OrderBook:
"""
限价订单簿
买盘按价格降序排列(高价优先)
卖盘按价格升序排列(低价优先)
同价格按时间戳升序排列(先到先得)
"""
def __init__(self, symbol: str):
self.symbol = symbol
# (price, timestamp, order) - SortedList 用元组比较实现多键排序
self.bids = SortedList(key=lambda x: (-x[0], x[1])) # 买盘:价格降序
self.asks = SortedList(key=lambda x: (x[0], x[1])) # 卖盘:价格升序
def add_order(self, order: Order) -> List['Fill']:
"""添加订单并执行撮合,返回成交列表"""
fills = []
if order.side == OrderSide.BUY:
# 检查卖盘是否有可成交订单
remaining_qty = order.quantity
idx = 0
while remaining_qty > 0 and idx < len(self.asks):
ask_price, _, ask_order = self.asks[idx]
# 买单只能在 ≤ 买价的价格成交
if ask_price > order.price:
break
fill_qty = min(remaining_qty, ask_order.quantity)
fills.append(Fill(
timestamp=order.timestamp,
symbol=self.symbol,
price=ask_price,
quantity=fill_qty,
buy_order_id=order.id,
sell_order_id=ask_order.id
))
remaining_qty -= fill_qty
# 更新对手订单数量
new_ask_order = ask_order._replace(
quantity=ask_order.quantity - fill_qty
)
if new_ask_order.quantity == 0:
self.asks.pop(idx)
else:
self.asks[idx] = (ask_price, ask_order.timestamp, new_ask_order)
idx += 1
# 剩余数量挂单
if remaining_qty > 0:
new_order = order._replace(quantity=remaining_qty)
self.bids.add((new_order.price, new_order.timestamp, new_order))
else: # SELL
# 检查买盘是否有可成交订单
remaining_qty = order.quantity
idx = 0
while remaining_qty > 0 and idx < len(self.bids):
bid_price, _, bid_order = self.bids[idx]
# 卖单只能在 ≥ 卖价的价格成交
if bid_price < order.price:
break
fill_qty = min(remaining_qty, bid_order.quantity)
fills.append(Fill(
timestamp=order.timestamp,
symbol=self.symbol,
price=bid_price,
quantity=fill_qty,
buy_order_id=bid_order.id,
sell_order_id=order.id
))
remaining_qty -= fill_qty
new_bid_order = bid_order._replace(
quantity=bid_order.quantity - fill_qty
)
if new_bid_order.quantity == 0:
self.bids.pop(idx)
else:
self.bids[idx] = (bid_price, bid_order.timestamp, new_bid_order)
idx += 1
if remaining_qty > 0:
new_order = order._replace(quantity=remaining_qty)
self.asks.add((new_order.price, new_order.timestamp, new_order))
return fills
def cancel_order(self, order_id: str) -> bool:
"""撤销订单"""
for book in [self.bids, self.asks]:
for i, (price, ts, order) in enumerate(book):
if order.id == order_id:
book.pop(i)
return True
return False
def get_best_bid(self) -> Optional[float]:
return self.bids[0][0] if self.bids else None
def get_best_ask(self) -> Optional[float]:
return self.asks[0][0] if self.asks else None
Fill = namedtuple('Fill', [
'timestamp', 'symbol', 'price', 'quantity',
'buy_order_id', 'sell_order_id'
])
这段代码的核心逻辑是:遍历对手盘,寻找可成交的订单。可成交的条件是:
- 买单:对手卖价 ≤ 买价(你能以不高于你报价的价格买到)
- 卖单:对手买价 ≥ 卖价(你能以不低于你报价的价格卖出)
撮合完成后,剩余未成交的数量重新挂入订单簿。
五、撮合引擎:完整实现与撮合模式选择
撮合引擎封装订单簿,并提供更高级的接口:
import uuid
from typing import Dict
from dataclasses import dataclass
class MatchingEngine:
"""
撮合引擎
管理多标的订单簿,执行撮合逻辑
"""
def __init__(self, slippage_model=None):
self.order_books: Dict[str, OrderBook] = {}
self.slippge_model = slippage_model # 滑点模型
# 存储所有订单,用于状态追踪
self.orders: Dict[str, Order] = {}
self.fills: List[Fill] = []
def submit_order(self, order: Order) -> List[Fill]:
"""
提交订单
Args:
order: 订单对象
Returns:
成交事件列表
"""
# 初始化订单簿
if order.symbol not in self.order_books:
self.order_books[order.symbol] = OrderBook(order.symbol)
self.orders[order.id] = order
# 市价单转换为限价单
if order.side == OrderSide.BUY:
best_ask = self.order_books[order.symbol].get_best_ask()
effective_price = order.price if order.price > 0 else (
best_ask if best_ask else float('inf')
)
else:
best_bid = self.order_books[order.symbol].get_best_bid()
effective_price = order.price if order.price > 0 else (
best_bid if best_bid else 0
)
effective_order = order._replace(price=effective_price)
fills = self.order_books[order.symbol].add_order(effective_order)
# 应用滑点
if self.slippage_model and fills:
fills = [
fill._replace(
price=self.slippage_model.apply(fill)
)
for fill in fills
]
self.fills.extend(fills)
return fills
def cancel_order(self, order_id: str, symbol: str) -> bool:
"""撤销订单"""
return self.order_books[symbol].cancel_order(order_id)
def get_position(self, symbol: str) -> int:
"""获取持仓数量"""
net = 0
for fill in self.fills:
if fill.symbol == symbol:
if fill.buy_order_id: # 成交发生在买单侧
net += fill.quantity
else:
net -= fill.quantity
return net
撮合模式的选择
上面的撮合引擎是逐笔撮合——每一笔新订单都与对手盘的每一笔挂单逐一撮合。这是最精确的撮合方式,但也是最慢的。
对于不同频率的策略,你需要不同的撮合模式:
| 撮合模式 | 适用频率 | 精度 | 速度 |
|---|---|---|---|
| 逐笔撮合 | 分钟级以下 | 最高 | 最慢 |
| 档位撮合 | 分钟级 | 中等 | 中等 |
| 快照撮合 | 小时级以上 | 较低 | 最快 |
档位撮合的核心思想是:把订单簿按价格档位聚合,一个档位内的所有订单视为一个整体成交。实现上,把 add_order 方法中对 SortedList 的遍历改为对档位数据的直接操作:
# 档位撮合伪代码
best_price_level = self.order_books[symbol].get_best_ask()
available_qty = sum(order.quantity for order in best_price_level.orders)
fill_qty = min(order.quantity, available_qty)
# 整个档位视为瞬间成交
选择哪种撮合模式,取决于你的策略频率和精度要求。没有最好的撮合模式,只有适合你的撮合模式。
六、回测主循环:事件循环与时间推进
现在,我们把撮合引擎、策略、风险管理组装在一起:
import pandas as pd
from datetime import datetime
from typing import List, Optional, Callable
class BacktestEngine:
"""
事件驱动回测引擎
"""
def __init__(
self,
initial_cash: float = 100_000,
commission_rate: float = 0.001,
slippage_model=None
):
self.initial_cash = initial_cash
self.cash = initial_cash
self.commission_rate = commission_rate
# 组件
self.matching_engine = MatchingEngine(slippage_model=slippage_model)
self.strategy: Optional[Callable] = None
self.position = {} # symbol -> quantity
# 事件日志
self.events = []
self.equity_curve = []
# 统计
self.total_trades = 0
self.total_commission = 0
def set_strategy(self, strategy: Callable):
"""设置策略函数
Strategy signature: (engine, market_data) -> List[Order]
"""
self.strategy = strategy
def run(self, market_data: pd.DataFrame):
"""
运行回测
Args:
market_data: DataFrame with columns [timestamp, symbol, open, high, low, close, volume]
"""
market_data = market_data.sort_values('timestamp').reset_index(drop=True)
for _, row in market_data.iterrows():
timestamp = row['timestamp']
symbol = row['symbol'] if 'symbol' in row else market_data['symbol'].iloc[0]
# 构造市场数据事件
md_event = MarketDataEvent(
timestamp=timestamp,
symbol=symbol,
open=row['open'],
high=row['high'],
low=row['low'],
close=row['close'],
volume=row['volume']
)
# 策略生成订单
if self.strategy:
try:
orders = self.strategy(self, md_event)
except Exception as e:
print(f"策略执行错误: {e}")
orders = []
# 风控检查 + 提交订单
for order in orders:
order = self._risk_check(order)
if order:
self._submit_order(order)
# 更新权益曲线
self._update_equity(timestamp)
self._generate_summary()
def _risk_check(self, order: Order) -> Optional[Order]:
"""风控检查"""
# 检查资金
estimated_cost = order.price * order.quantity * (1 + self.commission_rate)
if order.side == OrderSide.BUY and estimated_cost > self.cash:
return None
# 检查做空限制
if order.side == OrderSide.SELL:
current_position = self.position.get(order.symbol, 0)
if current_position < order.quantity:
return None
return order
def _submit_order(self, order: Order):
"""提交订单"""
fills = self.matching_engine.submit_order(order)
for fill in fills:
self._process_fill(fill)
def _process_fill(self, fill: Fill):
"""处理成交"""
commission = fill.price * fill.quantity * self.commission_rate
self.total_commission += commission
if fill.buy_order_id: # 买方成交
cost = fill.price * fill.quantity + commission
self.cash -= cost
self.position[fill.symbol] = self.position.get(fill.symbol, 0) + fill.quantity
else: # 卖方成交
revenue = fill.price * fill.quantity - commission
self.cash += revenue
self.position[fill.symbol] = self.position.get(fill.symbol, 0) - fill.quantity
self.total_trades += 1
def _update_equity(self, timestamp: datetime):
"""更新权益曲线"""
position_value = sum(
pos * self.matching_engine.order_books.get(
sym, OrderBook(sym)
).get_best_bid() or 0
for sym, pos in self.position.items()
)
self.equity_curve.append({
'timestamp': timestamp,
'cash': self.cash,
'position_value': position_value,
'total_equity': self.cash + position_value
})
def _generate_summary(self):
"""生成回测报告"""
print(f"总交易次数: {self.total_trades}")
print(f"总佣金: {self.total_commission:.2f}")
print(f"最终资金: {self.cash:.2f}")
print(f"初始资金: {self.initial_cash:.2f}")
print(f"收益率: {(self.cash / self.initial_cash - 1) * 100:.2f}%")
这段回测引擎的逻辑非常直接:按时间顺序遍历数据,每个时间点执行一次策略 + 订单撮合。关键的改进空间在于:
滑点模型:上面的实现使用固定的佣金率。实盘中,滑点与订单大小、市场流动性相关。更精确的模型需要引入流动性调整和订单大小影响。
多标的处理:上面的实现假设数据只有一个标的。如果有多标的,你需要按时间戳聚合同一时刻的所有标的,然后并行处理。
事件流处理:上面的实现是同步的。如果你想支持实时回测或流式处理,需要引入异步架构。
七、从回测到实盘:架构升级路径
上面的回测引擎是同步的、单线程的,适合日级到小时级的策略。但如果你要做分钟级或秒级的高频策略,你需要异步架构。
import asyncio
from typing import AsyncGenerator
class AsyncBacktestEngine(BacktestEngine):
"""
异步事件驱动回测引擎
支持实时数据流和异步策略
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.event_queue = asyncio.Queue()
self.running = False
async def run_stream(
self,
data_stream: AsyncGenerator[MarketDataEvent, None]
):
"""
异步运行回测
Args:
data_stream: 异步数据流(可以是 WebSocket 连接或文件读取器)
"""
self.running = True
async for event in data_stream:
if not self.running:
break
# 策略执行(异步)
if self.strategy:
strategy_coro = self.strategy(self, event)
if asyncio.iscoroutine(strategy_coro):
orders = await strategy_coro
else:
orders = strategy_coro
# 风控 + 提交订单
for order in orders:
if self._risk_check(order):
await self._submit_order_async(order)
# 更新权益
self._update_equity(event.timestamp)
async def _submit_order_async(self, order: Order):
"""异步提交订单"""
fills = self.matching_engine.submit_order(order)
for fill in fills:
self._process_fill(fill)
def stop(self):
"""停止回测"""
self.running = False
异步架构的核心变化:
- 策略函数可以是
async def,支持await调用外部 API(如 TickDB 的 WebSocket 推送) - 数据源改为异步生成器,可以是文件、网络流或真实的 WebSocket 连接
- 事件处理与数据接收解耦,避免阻塞
如果你的策略需要同时订阅多个数据源(多个标的或多个市场),异步架构几乎是必选项。
八、回测局限性:你在为什么付出代价
回到开篇的问题:为什么回测赚钱,实盘亏钱?
在理解了事件驱动框架的原理后,我们现在可以系统性地回答这个问题:
| 回测假设 | 实盘现实 | 影响 |
|---|---|---|
| 订单立即成交 | 有网络延迟和处理时间 | 低估交易成本 |
| 成交价 = 报价 | 大单冲击市场 | 低估市场冲击 |
| 可交易任意数量 | 流动性约束 | 无法执行大仓位 |
| 数据无缺失 | 可能丢包、断线 | 事件处理不连续 |
| 固定采样频率 | 真实行情连续 | 高频策略严重失真 |
事件驱动框架无法消除这些局限性,但它能让你清晰地看到每个假设在哪里。 当你发现回测结果与实盘差距大时,首先检查的是框架的假设,而不是策略本身。
回测局限性说明:上述回测框架基于历史数据模拟,不代表未来收益。框架中存在以下未充分模拟的因素:滑点采用固定比例(0.1%),未考虑订单大小对流动性的冲击;撮合采用逐笔撮合,与实际交易所的订单簿维护存在差异;未模拟涨跌停、停牌等特殊市场状态;未考虑资金规模对市场的影响。建议在实际使用前进行更严格的样本外测试和实盘模拟。
九、架构选型决策树
最后,一句话总结如何选择回测框架:
- 策略频率 ≥ 小时级,数据量 < 100 万行 → 同步事件驱动框架足够
- **策略频率 < 小时级**,数据量 > 100 万行 → 向量化框架 + 事件验证
- 策略频率 < 分钟级,需要多数据源 → 异步事件驱动框架
- 策略频率秒级或更高 → 你需要专业的 C++/Rust 低延迟框架,这不是 Python 能稳定处理的场景
十、性能扩展:从 Pandas 到 ClickHouse
当数据量超过内存限制时,你需要扩展存储和查询能力。
| 数据规模 | 存储方案 | 查询方式 |
|---|---|---|
| < 10GB | Pandas/Parquet | 全量加载 |
| 10GB - 100GB | Parquet + 分区 | 按时间范围加载 |
| > 100GB | ClickHouse / DuckDB | SQL 查询 + 流式加载 |
如果你需要快速查询 10 年级别的历史 K 线数据进行策略回测,一个常见的方案是使用 TickDB 的 REST API 按需拉取数据:
import os
import requests
import pandas as pd
def fetch_historical_klines(symbol: str, interval: str = "1d", limit: int = 3650):
"""
从 TickDB 获取历史 K 线数据用于回测
API 文档: https://docs.tickdb.ai
Args:
symbol: 交易品种,如 "AAPL.US"
interval: K 线周期,如 "1d", "1h", "1m"
limit: 获取数量
Returns:
DataFrame with [timestamp, open, high, low, close, volume]
"""
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
headers = {"X-API-Key": api_key}
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=headers,
params=params,
timeout=(3.05, 10) # 连接超时 3.05s,读取超时 10s
)
if response.status_code != 200:
raise RuntimeError(f"请求失败: {response.status_code}")
data = response.json()
if data.get("code") != 0:
raise RuntimeError(f"API 错误: {data.get('message')}")
df = pd.DataFrame(data["data"]["klines"])
df["timestamp"] = pd.to_datetime(df["t"], unit="ms")
df = df.rename(columns={
"o": "open", "h": "high", "l": "low",
"c": "close", "v": "volume"
})
return df[["timestamp", "open", "high", "low", "close", "volume"]]
# 使用示例
if __name__ == "__main__":
klines = fetch_historical_klines("AAPL.US", "1d", 3650)
engine = BacktestEngine(
initial_cash=100_000,
commission_rate=0.001
)
# 设置简单的双均线策略
def ma_strategy(engine, md_event):
df = engine.price_history.get(md_event.symbol, pd.DataFrame())
df = pd.concat([df, pd.DataFrame([{
'timestamp': md_event.timestamp,
'close': md_event.close
}])], ignore_index=True)
engine.price_history[md_event.symbol] = df
if len(df) < 50:
return []
ma20 = df['close'].iloc[-20:].mean()
ma50 = df['close'].iloc[-50:].mean()
ma20_prev = df['close'].iloc[-21:-1].mean()
ma50_prev = df['close'].iloc[-51:-1].mean()
position = engine.position.get(md_event.symbol, 0)
if ma20 > ma50 and ma20_prev <= ma50_prev:
return [Order(
id=str(uuid.uuid4()),
symbol=md_event.symbol,
side=OrderSide.BUY,
price=md_event.close,
quantity=100,
timestamp=md_event.timestamp
)]
elif ma20 < ma50 and ma20_prev >= ma50_prev:
return [Order(
id=str(uuid.uuid4()),
symbol=md_event.symbol,
side=OrderSide.SELL,
price=md_event.close,
quantity=position,
timestamp=md_event.timestamp
)]
return []
engine.price_history = {}
engine.set_strategy(ma_strategy)
engine.run(klines)
这段代码展示了完整的回测流程:从 TickDB 获取历史 K 线 → 构造事件 → 运行策略 → 输出结果。对于更复杂的策略,你可以把 ma_strategy 替换成你自己的策略逻辑。
下一步行动
如果你想亲手实现一个回测引擎:
- 从本文的撮合引擎代码开始,添加仓位管理和绩效计算模块
- 用 TickDB 的历史 K 线数据跑一个双均线策略,观察实盘差距
- 加入滑点模型和流动性约束,重新评估策略有效性
如果你已经有回测框架,想升级为异步架构:
- 参考
AsyncBacktestEngine的设计,将策略函数改为async def - 用 TickDB 的 WebSocket 推送替代 REST API,观察延迟差异
如果你需要更长的历史数据做跨周期回测:
TickDB 提供 10 年级别的美股历史 K 线数据,REST API 可按需拉取,适合日线级别的中长期策略回测。访问 tickdb.ai 了解更多。
本文不构成任何投资建议。市场有风险,投资需谨慎。回测结果基于历史数据,不代表未来收益。