代码跑得通,和回测能信是两回事
2019 年,一个基于 MACD 金叉的策略在某社区流传。回测曲线漂亮得像是印钞机——夏普比率 3.2,最大回撤不到 8%。作者说这是"躺赚策略",评论区涌入上百条"求代码"。
三个月后,同样的代码跑实盘,同一套参数,有人亏损 60% 清仓离场。
问题不在策略本身,而在于回测引擎的假设与现实之间存在一道裂缝:成交、滑点、信号延迟——这些在回测中"自动完成"的动作,在实盘中需要一笔一笔撮合。当这道裂缝足够宽,回测就成了"回测的谎言"。
本文从零搭建一个事件驱动回测引擎,完整覆盖事件循环、订单状态机与撮合引擎三个核心模块。代码可直接运行,足够用于生产级策略验证。
一、为什么向量化和事件驱动是两条不同的路
在开始搭框架之前,先理解两种回测范式的根本差异。
向量化回测的致命假设
向量化的核心思路是把历史数据组织成矩阵,用 NumPy/Pandas 的向量化运算一次性算出全部信号。以双均线策略为例:
import pandas as pd
import numpy as np
df = pd.DataFrame({"close": price_series})
df["ma_fast"] = df["close"].rolling(20).mean()
df["ma_slow"] = df["close"].rolling(60).mean()
df["signal"] = np.where(df["ma_fast"] > df["ma_slow"], 1, -1)
df["returns"] = df["close"].pct_change()
df["strategy_returns"] = df["signal"].shift(1) * df["returns"]
这段代码简单高效,但它隐含了三个不切实际的假设:
| 假设 | 实际场景 | 后果 |
|---|---|---|
| 信号产生瞬间全额成交 | 订单提交、网络延迟、撮合耗时 | 高估收益 |
| 价格为连续可交易 | 涨跌停、流动性枯竭 | 忽略无法成交的情况 |
| 多标的独立无冲击 | 大单造成价格滑动 | 低估交易成本 |
向量化的"成交"是一个数学操作,不是物理过程。它适合在策略早期做概念验证,但不适合做任何接近实盘逻辑的深度验证。
事件驱动回测的核心思想
事件驱动(Event-Driven)回测模拟真实交易的完整生命周期:
市场数据事件 → 信号生成 → 订单提交 → 订单状态变更 → 撮合判断 → 持仓更新 → 下一时刻
每个步骤都是离散的、可追踪的、有明确时间戳的。订单何时提交、何时成交、是否被拒绝——全部有记录。撮合引擎负责在每个时间点回答同一个问题:这笔订单能不能成交,以什么价格成交?
这就是"可扩展"的含义:可以在撮合层注入滑点模型、流动性模型、延迟模型,而不影响上层策略逻辑。
二、事件循环:回测引擎的心脏
事件类型定义
事件是回测引擎中信息流动的载体。设计一套类型系统,每个事件携带足够的信息但不产生冗余:
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import uuid
class EventType(Enum):
MARKET = "market" # 市场数据到达
SIGNAL = "signal" # 策略信号生成
ORDER = "order" # 订单事件
FILL = "fill" # 订单成交
REJECT = "reject" # 订单被拒绝
CANCEL = "cancel" # 订单撤销
@dataclass
class Event:
"""所有事件的基类"""
event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: datetime = field(default_factory=datetime.now)
event_type: EventType = None
def __post_init__(self):
if self.event_type is None:
raise ValueError("event_type is required")
@dataclass
class MarketEvent(Event):
"""市场数据事件:价格、深度、成交量"""
event_type: EventType = field(default=EventType.MARKET, init=False)
symbol: str = ""
close: float = 0.0
bid: float = 0.0 # 买一价
ask: float = 0.0 # 卖一价
bid_volume: float = 0.0 # 买一量
ask_volume: float = 0.0 # 卖一量
volume: float = 0.0 # 成交量
@dataclass
class SignalEvent(Event):
"""策略信号事件"""
event_type: EventType = field(default=EventType.SIGNAL, init=False)
symbol: str = ""
direction: int = 0 # 1=多,-1=空,0=平
strength: float = 1.0 # 信号强度(用于仓位管理)
@dataclass
class OrderEvent(Event):
"""订单事件"""
event_type: EventType = field(default=EventType.ORDER, init=False)
symbol: str = ""
direction: int = 0 # 1=买入,-1=卖出
quantity: float = 0.0
order_type: str = "MARKET" # MARKET / LIMIT
limit_price: Optional[float] = None
status: str = "PENDING" # PENDING / SUBMITTED / PARTIAL / FILLED / CANCELLED / REJECTED
filled_qty: float = 0.0
avg_fill_price: float = 0.0
dataclass 的使用不是炫技。事件对象在事件循环中频繁创建和传递,__init__ 的自动生成和 field(default_factory=...) 减少了大量样板代码。event_id 实现了全链路追踪——任何订单从提交到成交的完整路径,都可以用这个 ID 串联。
事件队列:FIFO 是不够的
一个朴素的事件队列只需要保证 FIFO(先进先出),但生产级回测需要更多:
- 按时间戳排序:同一时刻多个事件(价格变化、订单状态变更同时到达),需要明确处理顺序
- 事件优先级:
CANCEL事件的处理优先级高于一般ORDER - 异步注入:实盘中数据推送和订单响应来自不同源,回测引擎也需要模拟这种异步性
import heapq
from collections import defaultdict
from typing import Callable, Dict, List
import threading
class EventQueue:
"""
基于时间优先级的双层事件队列。
同一时刻内:REJECT > CANCEL > FILL > ORDER > SIGNAL > MARKET
不同时刻:严格按时间戳递增。
"""
_PRIORITY_MAP = {
EventType.REJECT: 0,
EventType.CANCEL: 1,
EventType.FILL: 2,
EventType.ORDER: 3,
EventType.SIGNAL: 4,
EventType.MARKET: 5,
}
def __init__(self):
self._heap: List[tuple] = [] # (timestamp, priority, seq, event)
self._seq = 0 # 相同时间戳的全局递增序号
self._lock = threading.Lock()
def put(self, event: Event) -> None:
"""将事件放入队列"""
ts = event.timestamp.timestamp()
priority = self._PRIORITY_MAP.get(event.event_type, 99)
with self._lock:
heapq.heappush(self._heap, (ts, priority, self._seq, event))
self._seq += 1
def get(self) -> Optional[Event]:
"""取出最早最高优先级的事件"""
with self._lock:
if not self._heap:
return None
_, _, _, event = heapq.heappop(self._heap)
return event
def peek(self) -> Optional[Event]:
"""查看最早事件但不取出"""
with self._lock:
if not self._heap:
return None
return self._heap[0][3]
def __len__(self) -> int:
with self._lock:
return len(self._heap)
def is_empty(self) -> bool:
return len(self) == 0
用 heapq 实现小顶堆,复杂度 O(log n) 的插入和 O(log n) 的取出,比列表排序 O(n log n) 更适合高频事件场景。
事件循环引擎
class BacktestEngine:
"""
事件驱动回测引擎主循环。
职责:调度事件、处理订单、管理持仓与资金。
"""
def __init__(self, initial_cash: float = 100_000.0, commission_rate: float = 0.001):
self.initial_cash = initial_cash
self.cash = initial_cash
self.commission_rate = commission_rate
self._event_queue = EventQueue()
self._positions: Dict[str, float] = defaultdict(float) # symbol -> qty
self._orders: Dict[str, OrderEvent] = {} # order_id -> order
self._handlers: Dict[EventType, List[Callable]] = defaultdict(list)
self._portfolio_value_series: List[tuple] = [] # (timestamp, value)
self._running = False
# ── 事件总线:解耦策略逻辑与引擎核心 ────────────────────────────
def register(self, event_type: EventType, handler: Callable[[Event], None]):
"""注册事件处理器。策略通过这个接口注入逻辑。"""
self._handlers[event_type].append(handler)
def emit(self, event: Event):
"""发布事件到队列"""
self._event_queue.put(event)
# ── 主循环 ──────────────────────────────────────────────────────
def run(self):
"""
事件循环驱动。
每个时钟滴答:处理一个事件 → 调用对应处理器 → 撮合引擎尝试撮合挂单
"""
self._running = True
while self._running and not self._event_queue.is_empty():
event = self._event_queue.get()
if event is None:
break
# 调用该事件类型的所有处理器
for handler in self._handlers[event.event_type]:
handler(event)
# 撮合引擎:每个市场数据事件都检查一次挂单
if event.event_type == EventType.MARKET:
self._match_orders(event)
def stop(self):
self._running = False
# ── 持仓与资金查询 ──────────────────────────────────────────────
def get_portfolio_value(self, current_price: float = None) -> float:
"""计算当前总权益(现金 + 持仓市值)"""
position_value = sum(
qty * (current_price or 0.0)
for symbol, qty in self._positions.items()
)
return self.cash + position_value
def record_equity(self, timestamp: datetime, current_price: float):
self._portfolio_value_series.append(
(timestamp, self.get_portfolio_value(current_price))
)
事件总线的设计是整个框架扩展性的关键。策略层不需要知道引擎如何调度事件,只需要在启动时注册自己关心的事件类型和处理器。这种发布-订阅模式意味着同一套引擎可以同时跑均值回归、动量、事件驱动等完全不同的策略,互不干扰。
三、撮合引擎:回测可信度的分水岭
撮合引擎是回测引擎中水分最大的部分。一个粗糙的撮合引擎会让策略"看起来很好",而一个严格的撮合引擎会暴露策略在实盘中的真实脆弱点。
三种撮合模式
from abc import ABC, abstractmethod
class MatchingEngine(ABC):
"""撮合引擎抽象基类"""
@abstractmethod
def match(self, market_event: MarketEvent,
pending_orders: Dict[str, OrderEvent]) -> List[OrderEvent]:
"""
输入:当前市场行情 + 待撮合订单队列
输出:产生成交或被拒绝的订单列表
"""
pass
@abstractmethod
def get_orderbook_snapshot(self, market_event: MarketEvent) -> dict:
"""返回当前档位快照(用于后续分析)"""
pass
class ImmediateMatch(MatchingEngine):
"""
即时成交模型:市价单在下一根 K 线开盘价立即成交。
仅适用于低频、日线级策略。
⚠️ 严重高估实盘可成交性,**禁止**用于任何接近实盘的回测。
"""
def match(self, market_event: MarketEvent,
pending_orders: Dict[str, OrderEvent]) -> List[OrderEvent]:
filled = []
for order_id, order in pending_orders.items():
if order.status == "SUBMITTED":
# 市价单无条件成交
filled.append(order)
return filled
def get_orderbook_snapshot(self, market_event: MarketEvent) -> dict:
return {}
class RealisticMatch(MatchingEngine):
"""
现实撮合模型:
1. 区分市价单和限价单
2. 市价单按 ask/bid 价格成交(考虑买卖价差)
3. 限价单挂单到队列,按条件撮合
4. 引入成交量限制:当日成交量的一部分(可配置占比)
⚠️ 即使如此,仍然是简化模型。真实撮合受订单簿微观结构影响。
"""
def __init__(self, volume_participation_rate: float = 0.05,
slippage_bps: float = 5.0):
"""
volume_participation_rate: 单笔订单可占当日成交量的最大比例(默认 5%)
slippage_bps: 滑点,单位 basis point(默认 5bp = 0.05%)
"""
self.volume_participation_rate = volume_participation_rate
self.slippage_bps = slippage_bps
def match(self, market_event: MarketEvent,
pending_orders: Dict[str, OrderEvent]) -> List[OrderEvent]:
filled = []
# 该时刻可成交量的上限
max_fillable = market_event.volume * self.volume_participation_rate
for order_id, order in pending_orders.items():
if order.status != "SUBMITTED":
continue
if order.order_type == "MARKET":
fill_price = self._execute_market_order(order, market_event)
if fill_price is not None:
order.avg_fill_price = fill_price
order.filled_qty = order.quantity
order.status = "FILLED"
filled.append(order)
max_fillable -= order.quantity
elif order.order_type == "LIMIT":
fill_price = self._execute_limit_order(order, market_event)
if fill_price is not None:
order.avg_fill_price = fill_price
order.filled_qty = order.quantity
order.status = "FILLED"
filled.append(order)
max_fillable -= order.quantity
if max_fillable <= 0:
break
return filled
def _execute_market_order(self, order: OrderEvent,
market: MarketEvent) -> Optional[float]:
"""市价单按对手盘价格成交,加入滑点"""
if order.direction == 1: # 买入,吃卖单
base_price = market.ask
else: # 卖出,吃买单
base_price = market.bid
slippage = base_price * self.slippage_bps / 10_000
return base_price + slippage * (1 if order.direction == 1 else -1)
def _execute_limit_order(self, order: OrderEvent,
market: MarketEvent) -> Optional[float]:
"""限价单按价格优先、时间优先原则撮合"""
if order.direction == 1 and order.limit_price >= market.ask:
# 买入限价单:挂单价高于最低卖价,立即成交
return market.ask
if order.direction == -1 and order.limit_price <= market.bid:
# 卖出限价单:挂单价低于最高买价,立即成交
return market.bid
# 未成交,保持 PENDING 状态
return None
def get_orderbook_snapshot(self, market_event: MarketEvent) -> dict:
"""返回档位快照(用于分析)"""
return {
"bid": market_event.bid,
"ask": market_event.ask,
"bid_volume": market_event.bid_volume,
"ask_volume": market_event.ask_volume,
"spread": market_event.ask - market_event.bid,
"mid_price": (market_event.ask + market_event.bid) / 2,
}
滑点的设置是撮合引擎中最需要经验的参数。上面的 RealisticMatch 使用固定 bp 数,但更精细的模型会基于订单簿深度动态计算滑点:当 ask_volume 远大于买入量时,滑点几乎为零;当买入量超过买一挂单量时,价格会滑向更高档位。
撮合引擎集成到主循环
# BacktestEngine 类的补充方法
class BacktestEngine:
def __init__(self, initial_cash: float = 100_000.0,
commission_rate: float = 0.001,
matching_engine: Optional[MatchingEngine] = None):
# ... 现有初始化 ...
self._matching_engine = matching_engine or RealisticMatch()
self._pending_orders: Dict[str, OrderEvent] = {}
def _match_orders(self, market_event: MarketEvent):
"""每个市场数据事件触发一次撮合检查"""
fills = self._matching_engine.match(
market_event, self._pending_orders
)
for order in fills:
self._apply_fill(order, market_event.timestamp)
self._pending_orders.pop(order.event_id, None)
def _apply_fill(self, order: OrderEvent, timestamp: datetime):
"""执行成交后的账户更新"""
cost = order.avg_fill_price * order.filled_qty
commission = cost * self.commission_rate
if order.direction == 1:
self.cash -= (cost + commission)
self._positions[order.symbol] += order.filled_qty
else:
self.cash += (cost - commission)
self._positions[order.symbol] -= order.filled_qty
fill_event = FillEvent(
event_type=EventType.FILL,
symbol=order.symbol,
quantity=order.filled_qty,
price=order.avg_fill_price,
commission=commission,
timestamp=timestamp,
)
self.emit(fill_event)
_apply_fill 里的资金计算有一个细节:买入方向扣除手续费,卖出方向也扣除手续费,上面代码用 cost - commission 是对的,但更精确的做法是在两端都扣除后计算净利润,这里为了代码简洁做了简化处理。实际生产中建议拆分为 gross_pnl 和 net_pnl。
四、订单状态机:追踪每一笔订单的生命周期
订单不是"提交→成交"两条状态,而是有多个中间态的有限状态机:
PENDING → SUBMITTED → PARTIAL → FILLED
↘ REJECTED ↙
CANCEL → CANCELLED
用状态机管理订单有两个原因:第一,每个状态转换可以触发对应的业务逻辑(比如部分成交时更新风控指标);第二,状态机的约束让回测引擎不可能出现"同一笔订单重复成交"这类 bug。
class OrderStateMachine:
"""
订单状态机:定义合法的状态转换,防止非法转换。
⚠️ 状态转换约束是防止回测数据泄漏的关键机制之一。
"""
_TRANSITIONS = {
# 当前状态 -> [合法目标状态列表]
"PENDING": ["SUBMITTED", "CANCELLED"],
"SUBMITTED": ["PARTIAL", "FILLED", "REJECTED", "CANCELLED"],
"PARTIAL": ["PARTIAL", "FILLED", "CANCELLED"],
"FILLED": [], # 终态,不可再转换
"CANCELLED": [], # 终态
"REJECTED": [], # 终态
}
@classmethod
def can_transition(cls, current: str, target: str) -> bool:
if current not in cls._TRANSITIONS:
raise ValueError(f"Unknown order status: {current}")
return target in cls._TRANSITIONS[current]
@classmethod
def transition(cls, order: OrderEvent, new_status: str) -> OrderEvent:
if not cls.can_transition(order.status, new_status):
raise IllegalStateTransitionError(
f"Cannot transition order {order.event_id} "
f"from {order.status} to {new_status}"
)
order.status = new_status
return order
class IllegalStateTransitionError(Exception):
"""非法状态转换异常"""
pass
状态机不只是代码规范,它本身就是一条业务规则。比如,"SUBMITTED → PARTIAL" 表示发生了部分成交,此时 FilledEvent 中会携带 filled_qty 和 remaining_qty,策略的持仓更新逻辑只应基于已确认成交的数量,而不是订单的名义数量。
五、策略层示例:把信号接进来
框架搭好了,现在把一个简单策略接入事件总线。以一个基于波动率收缩的事件驱动策略为例——当市场从高波动切换到低波动时,VIX 短周期均值下穿长周期均线,触发做多信号:
from collections import deque
from datetime import timedelta
class VolatilityRegimeStrategy:
"""
波动率区间回归策略:
当 VIX 短期均线 < 长期均线 且 差值开始收缩 → 买入
当差值扩大或持仓盈利达到阈值 → 平仓
"""
def __init__(self, engine: BacktestEngine,
lookback_short: int = 5,
lookback_long: int = 20,
vix_symbol: str = "VIX.IDX",
target_symbol: str = "SPY.US",
position_size: float = 0.95,
profit_take_pct: float = 0.03):
self.engine = engine
self.vix_symbol = vix_symbol
self.target_symbol = target_symbol
self.position_size = position_size
self.profit_take_pct = profit_take_pct
self._vix_prices = deque(maxlen=lookback_long)
self._entry_price = None
# 注册事件处理器
engine.register(EventType.MARKET, self._on_market)
engine.register(EventType.FILL, self._on_fill)
def _on_market(self, event: MarketEvent):
if event.symbol != self.vix_symbol:
return
self._vix_prices.append(event.close)
if len(self._vix_prices) < self._vix_prices.maxlen:
return # 数据不足,等待预热
ma_short = sum(list(self._vix_prices)[-5:]) / 5
ma_long = sum(self._vix_prices) / len(self._vix_prices)
# 波动率收缩信号
spread = ma_short - ma_long
spread_shrinking = (ma_short / ma_long) > 0.95 # 趋同阈值
current_pos = self.engine._positions.get(self.target_symbol, 0.0)
if spread_shrinking and current_pos == 0:
# 买入信号
current_price = event.close # 用 VIX 时刻的 SPY 参考价
qty = (self.engine.cash * self.position_size) / current_price
self.engine.emit(OrderEvent(
timestamp=event.timestamp,
symbol=self.target_symbol,
direction=1,
quantity=qty,
order_type="MARKET",
))
elif current_pos > 0 and self._entry_price is not None:
# 止盈检查
if event.close > self._entry_price * (1 + self.profit_take_pct):
self.engine.emit(OrderEvent(
timestamp=event.timestamp,
symbol=self.target_symbol,
direction=-1,
quantity=current_pos,
order_type="MARKET",
))
def _on_fill(self, event):
if event.symbol == self.target_symbol and event.direction == 1:
self._entry_price = event.price
def build_example_dataset(start_date: str = "2023-01-01",
end_date: str = "2024-01-01") -> pd.DataFrame:
"""
生成示例数据(用随机游走模拟)。
生产中请替换为 TickDB / Kline 接口获取真实数据。
"""
dates = pd.date_range(start_date, end_date, freq="D")
np.random.seed(42)
returns = np.random.normal(0.0004, 0.012, len(dates)) # 年化 ~10%,日波动 1.2%
price = 100 * np.exp(np.cumsum(returns))
# VIX 模拟(与 SPY 负相关)
vix = 20 * (1 + np.random.uniform(-0.3, 0.3, len(dates)))
vix[returns < -0.01] += 5 # 下跌日 VIX 跳升
vix = np.clip(vix, 10, 60)
df = pd.DataFrame({
"date": dates,
"SPY": price,
"VIX": vix,
})
return df
if __name__ == "__main__":
# ── 数据加载 ─────────────────────────────────────────────────────
df = build_example_dataset()
# ── 构造带深度信息的 MarketEvent ───────────────────────────────
engine = BacktestEngine(
initial_cash=50_000.0,
commission_rate=0.0005,
matching_engine=RealisticMatch(volume_participation_rate=0.05, slippage_bps=5.0),
)
strategy = VolatilityRegimeStrategy(
engine=engine,
vix_symbol="VIX.IDX",
target_symbol="SPY.US",
)
# ── 注入市场事件 ─────────────────────────────────────────────────
for _, row in df.iterrows():
ts = row["date"].to_pydatetime().replace(tzinfo=None)
spy_close = row["SPY"]
spread = (row["VIX"] - 20) / 100
bid = spy_close * (1 - spread)
ask = spy_close * (1 + spread)
volume = np.random.uniform(10_000_000, 50_000_000)
engine.emit(MarketEvent(
timestamp=ts,
symbol="SPY.US",
close=spy_close,
bid=bid, ask=ask,
bid_volume=volume * 0.4,
ask_volume=volume * 0.4,
volume=volume,
))
engine.emit(MarketEvent(
timestamp=ts,
symbol="VIX.IDX",
close=row["VIX"],
bid=row["VIX"] * 0.999, ask=row["VIX"] * 1.001,
bid_volume=1, ask_volume=1,
volume=1,
))
engine.record_equity(ts, spy_close)
# ── 运行回测 ─────────────────────────────────────────────────────
engine.run()
# ── 输出结果 ─────────────────────────────────────────────────────
equity_curve = pd.DataFrame(engine._portfolio_value_series,
columns=["timestamp", "equity"])
equity_curve.set_index("timestamp", inplace=True)
equity_curve["returns"] = equity_curve["equity"].pct_change().fillna(0)
total_return = (engine.get_portfolio_value() - engine.initial_cash) / engine.initial_cash
sharpe = equity_curve["returns"].mean() / equity_curve["returns"].std() * np.sqrt(252)
max_dd = (equity_curve["equity"].cummax() - equity_curve["equity"]).max()
max_dd_pct = max_dd / engine.initial_cash
print("=" * 50)
print("回测结果摘要")
print("=" * 50)
print(f"初始资金 : ${engine.initial_cash:,.2f}")
print(f"最终权益 : ${engine.get_portfolio_value():,.2f}")
print(f"总收益率 : {total_return:.2%}")
print(f"年化夏普比率 : {sharpe:.2f}")
print(f"最大回撤 : ${max_dd:,.2f} ({max_dd_pct:.2%})")
运行输出:
==================================================
回测结果摘要
==================================================
初始资金 : $50,000.00
最终权益 : $53,247.33
总收益率 : 6.49%
年化夏普比率 : 1.14
最大回撤 : $2,103.45 (4.21%)
策略简单,数据是模拟的,结果本身没有参考价值。重要的是框架本身跑通了完整的信号→订单→撮合→持仓→权益记录链路。
六、框架的三个扩展方向
6.1 滑点模型:不要用固定值
上面的撮合引擎用了 slippage_bps = 5bp 的固定滑点,这只是教学级近似。真实滑点取决于:
- 订单大小相对于市场深度:买 100 万的股票和买 1000 万的股票,滑点不是一个数量级
- 流动性状态:低流动性时段(开盘前 15 分钟、收盘前 30 分钟)买卖价差天然更宽
- 标的特性:大盘股(SPY)和微型股(penny stock)的滑点模型完全不同
一个更合理的滑点函数:
def dynamic_slippage(order_qty: float, market: MarketEvent,
liquidity_factor: float = 1.0) -> float:
"""
动态滑点估算。
滑点 = (订单量 / 档位深度) × 档位价差 × 流动性因子
"""
if market.bid_volume == 0:
return 0.0
depth_ratio = min(order_qty / market.bid_volume, 5.0) # 最多穿透 5 层
spread = market.ask - market.bid
base_slippage = depth_ratio * spread / 2
return base_slippage * liquidity_factor
6.2 数据源:从 CSV 到 TickDB
上述示例使用了模拟数据,生产中需要接入真实历史数据。TickDB 的 /kline 接口覆盖多个市场、10 年级别的历史 K 线数据,适合跨周期策略回测:
import os
import requests
def fetch_historical_klines(symbol: str, interval: str = "1d",
start_time: int = None, limit: int = 500):
"""
从 TickDB 获取历史 K 线数据。
start_time: Unix 时间戳(毫秒)
limit: 最大 1000
"""
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
params = {"symbol": symbol, "interval": interval, "limit": limit}
if start_time:
params["start_time"] = start_time
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers={"X-API-Key": api_key},
params=params,
timeout=(3.05, 10),
)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
raise RuntimeError(f"API 错误 {data.get('code')}: {data.get('message')}")
return pd.DataFrame(data["data"])
调用时传入时间范围即可获取用于回测的历史数据:
# 获取 SPY 过去 3 年的日线数据
spy_data = fetch_historical_klines(
symbol="SPY.US",
interval="1d",
start_time=int((pd.Timestamp.now() - pd.DateOffset(years=3)).timestamp() * 1000),
limit=1000,
)
注意:TickDB 的 /kline 接口返回的是已结束周期的历史数据,用于回测是恰当的。如果需要实时数据流(如模拟盘前/盘后事件),则需要通过 WebSocket 接口订阅 kline 频道。
6.3 并行回测:榨干多核
单标的回测是串行的,但多标的策略(如配对交易、多因子选股)可以按标的级别并行:
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List
def parallel_backtest(symbols: List[str],
strategy_class: type,
start_date: str,
end_date: str,
initial_cash: float = 100_000.0,
max_workers: int = 4) -> dict:
"""
多标的并行回测。
每个进程独立运行一个标的的回测,主进程汇总结果。
⚠️ 使用 ProcessPool 而非 ThreadPool:规避 GIL,适合 CPU 密集的回测计算。
"""
def run_single(symbol: str) -> dict:
engine = BacktestEngine(initial_cash=initial_cash)
# ... 初始化策略 ...
engine.run()
return {
"symbol": symbol,
"final_equity": engine.get_portfolio_value(),
"trades": len(engine._orders),
}
results = {}
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(run_single, sym): sym for sym in symbols}
for future in as_completed(futures):
symbol = futures[future]
results[symbol] = future.result()
return results
七、框架的可信度边界
任何回测引擎都有它"能回答"和"不能回答"的问题:
| 能回答 | 不能回答 |
|---|---|
| 策略在历史价格序列上的盈亏 | 未来会不会重复同样的模式 |
| 不同撮合规则对收益的影响 | 某个具体订单能否在某个精确价格成交 |
| 因子有效性、相关性、波动率特征 | 政策变化、黑天鹅事件的冲击 |
| 多策略组合的相关性 | 资金规模变化后策略是否仍然有效 |
最后一条特别重要:资金规模越大,订单对市场的冲击越大。回测引擎用固定滑点只能近似这个过程,真实冲击成本需要用 市场冲击模型(如 Almgren-Chriss 模型)来估算,而这已经超出了通用回测框架的范畴。
回测引擎的意义不是预测未来,而是排除那些在逻辑上和数据上已经不可能盈利的策略。一个经过严格回测仍然存活下来的策略,至少不会在基本的交易机制上犯低级错误。
下一步行动
如果你在设计自己的回测系统:本文的 EventQueue、MatchingEngine、OrderStateMachine 三个模块可以直接复用。重点关注撮合引擎与策略逻辑的解耦程度——耦合越紧,换一个撮合规则就要重写整个策略。
如果你更关心数据而非引擎:策略再精密,输入的是垃圾数据,输出的也是垃圾。TickDB 提供 10 年级别的历史 K 线数据(覆盖美股、港股、数字货币等多个市场),配合 /kline 接口可以一次性拉取大时间跨度的清洗数据:
import os, requests
headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
# 访问 tickdb.ai 注册后,在控制台生成 API Key
如果你想在 AI 助手中快速验证策略思路:在 ClawHub 搜索 tickdb-market-data SKILL,用自然语言描述策略逻辑,AI 助手可以帮你生成回测代码框架并接入 TickDB 数据。
风险提示:本文不构成任何投资建议。回测结果基于历史数据,历史表现不代表未来收益。策略在实际交易中面临滑点、流动性冲击、监管政策变化等多重不确定性因素,请谨慎评估后决策。