代码跑得通,和回测能信是两回事

2019 年,一个基于 MACD 金叉的策略在某社区流传。回测曲线漂亮得像是印钞机——夏普比率 3.2,最大回撤不到 8%。作者说这是"躺赚策略",评论区涌入上百条"求代码"。

三个月后,同样的代码跑实盘,同一套参数,有人亏损 60% 清仓离场。

问题不在策略本身,而在于回测引擎的假设与现实之间存在一道裂缝:成交、滑点、信号延迟——这些在回测中"自动完成"的动作,在实盘中需要一笔一笔撮合。当这道裂缝足够宽,回测就成了"回测的谎言"。

本文从零搭建一个事件驱动回测引擎,完整覆盖事件循环、订单状态机与撮合引擎三个核心模块。代码可直接运行,足够用于生产级策略验证。


一、为什么向量化和事件驱动是两条不同的路

在开始搭框架之前,先理解两种回测范式的根本差异。

向量化回测的致命假设

向量化的核心思路是把历史数据组织成矩阵,用 NumPy/Pandas 的向量化运算一次性算出全部信号。以双均线策略为例:

import pandas as pd
import numpy as np

df = pd.DataFrame({"close": price_series})
df["ma_fast"] = df["close"].rolling(20).mean()
df["ma_slow"] = df["close"].rolling(60).mean()
df["signal"] = np.where(df["ma_fast"] > df["ma_slow"], 1, -1)
df["returns"] = df["close"].pct_change()
df["strategy_returns"] = df["signal"].shift(1) * df["returns"]

这段代码简单高效,但它隐含了三个不切实际的假设:

假设 实际场景 后果
信号产生瞬间全额成交 订单提交、网络延迟、撮合耗时 高估收益
价格为连续可交易 涨跌停、流动性枯竭 忽略无法成交的情况
多标的独立无冲击 大单造成价格滑动 低估交易成本

向量化的"成交"是一个数学操作,不是物理过程。它适合在策略早期做概念验证,但不适合做任何接近实盘逻辑的深度验证。

事件驱动回测的核心思想

事件驱动(Event-Driven)回测模拟真实交易的完整生命周期:

市场数据事件 → 信号生成 → 订单提交 → 订单状态变更 → 撮合判断 → 持仓更新 → 下一时刻

每个步骤都是离散的、可追踪的、有明确时间戳的。订单何时提交、何时成交、是否被拒绝——全部有记录。撮合引擎负责在每个时间点回答同一个问题:这笔订单能不能成交,以什么价格成交?

这就是"可扩展"的含义:可以在撮合层注入滑点模型、流动性模型、延迟模型,而不影响上层策略逻辑。


二、事件循环:回测引擎的心脏

事件类型定义

事件是回测引擎中信息流动的载体。设计一套类型系统,每个事件携带足够的信息但不产生冗余:

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import uuid


class EventType(Enum):
    MARKET = "market"          # 市场数据到达
    SIGNAL = "signal"          # 策略信号生成
    ORDER = "order"            # 订单事件
    FILL = "fill"              # 订单成交
    REJECT = "reject"          # 订单被拒绝
    CANCEL = "cancel"          # 订单撤销


@dataclass
class Event:
    """所有事件的基类"""
    event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    timestamp: datetime = field(default_factory=datetime.now)
    event_type: EventType = None

    def __post_init__(self):
        if self.event_type is None:
            raise ValueError("event_type is required")


@dataclass
class MarketEvent(Event):
    """市场数据事件:价格、深度、成交量"""
    event_type: EventType = field(default=EventType.MARKET, init=False)
    symbol: str = ""
    close: float = 0.0
    bid: float = 0.0          # 买一价
    ask: float = 0.0          # 卖一价
    bid_volume: float = 0.0   # 买一量
    ask_volume: float = 0.0   # 卖一量
    volume: float = 0.0       # 成交量


@dataclass
class SignalEvent(Event):
    """策略信号事件"""
    event_type: EventType = field(default=EventType.SIGNAL, init=False)
    symbol: str = ""
    direction: int = 0        # 1=多,-1=空,0=平
    strength: float = 1.0     # 信号强度(用于仓位管理)


@dataclass
class OrderEvent(Event):
    """订单事件"""
    event_type: EventType = field(default=EventType.ORDER, init=False)
    symbol: str = ""
    direction: int = 0         # 1=买入,-1=卖出
    quantity: float = 0.0
    order_type: str = "MARKET" # MARKET / LIMIT
    limit_price: Optional[float] = None
    status: str = "PENDING"    # PENDING / SUBMITTED / PARTIAL / FILLED / CANCELLED / REJECTED
    filled_qty: float = 0.0
    avg_fill_price: float = 0.0

dataclass 的使用不是炫技。事件对象在事件循环中频繁创建和传递,__init__ 的自动生成和 field(default_factory=...) 减少了大量样板代码。event_id 实现了全链路追踪——任何订单从提交到成交的完整路径,都可以用这个 ID 串联。

事件队列:FIFO 是不够的

一个朴素的事件队列只需要保证 FIFO(先进先出),但生产级回测需要更多:

  • 按时间戳排序:同一时刻多个事件(价格变化、订单状态变更同时到达),需要明确处理顺序
  • 事件优先级CANCEL 事件的处理优先级高于一般 ORDER
  • 异步注入:实盘中数据推送和订单响应来自不同源,回测引擎也需要模拟这种异步性
import heapq
from collections import defaultdict
from typing import Callable, Dict, List
import threading


class EventQueue:
    """
    基于时间优先级的双层事件队列。
    同一时刻内:REJECT > CANCEL > FILL > ORDER > SIGNAL > MARKET
    不同时刻:严格按时间戳递增。
    """

    _PRIORITY_MAP = {
        EventType.REJECT: 0,
        EventType.CANCEL: 1,
        EventType.FILL: 2,
        EventType.ORDER: 3,
        EventType.SIGNAL: 4,
        EventType.MARKET: 5,
    }

    def __init__(self):
        self._heap: List[tuple] = []    # (timestamp, priority, seq, event)
        self._seq = 0                   # 相同时间戳的全局递增序号
        self._lock = threading.Lock()

    def put(self, event: Event) -> None:
        """将事件放入队列"""
        ts = event.timestamp.timestamp()
        priority = self._PRIORITY_MAP.get(event.event_type, 99)
        with self._lock:
            heapq.heappush(self._heap, (ts, priority, self._seq, event))
            self._seq += 1

    def get(self) -> Optional[Event]:
        """取出最早最高优先级的事件"""
        with self._lock:
            if not self._heap:
                return None
            _, _, _, event = heapq.heappop(self._heap)
            return event

    def peek(self) -> Optional[Event]:
        """查看最早事件但不取出"""
        with self._lock:
            if not self._heap:
                return None
            return self._heap[0][3]

    def __len__(self) -> int:
        with self._lock:
            return len(self._heap)

    def is_empty(self) -> bool:
        return len(self) == 0

heapq 实现小顶堆,复杂度 O(log n) 的插入和 O(log n) 的取出,比列表排序 O(n log n) 更适合高频事件场景。

事件循环引擎

class BacktestEngine:
    """
    事件驱动回测引擎主循环。
    职责:调度事件、处理订单、管理持仓与资金。
    """

    def __init__(self, initial_cash: float = 100_000.0, commission_rate: float = 0.001):
        self.initial_cash = initial_cash
        self.cash = initial_cash
        self.commission_rate = commission_rate

        self._event_queue = EventQueue()
        self._positions: Dict[str, float] = defaultdict(float)    # symbol -> qty
        self._orders: Dict[str, OrderEvent] = {}                   # order_id -> order
        self._handlers: Dict[EventType, List[Callable]] = defaultdict(list)
        self._portfolio_value_series: List[tuple] = []             # (timestamp, value)
        self._running = False

    # ── 事件总线:解耦策略逻辑与引擎核心 ────────────────────────────

    def register(self, event_type: EventType, handler: Callable[[Event], None]):
        """注册事件处理器。策略通过这个接口注入逻辑。"""
        self._handlers[event_type].append(handler)

    def emit(self, event: Event):
        """发布事件到队列"""
        self._event_queue.put(event)

    # ── 主循环 ──────────────────────────────────────────────────────

    def run(self):
        """
        事件循环驱动。
        每个时钟滴答:处理一个事件 → 调用对应处理器 → 撮合引擎尝试撮合挂单
        """
        self._running = True
        while self._running and not self._event_queue.is_empty():
            event = self._event_queue.get()
            if event is None:
                break

            # 调用该事件类型的所有处理器
            for handler in self._handlers[event.event_type]:
                handler(event)

            # 撮合引擎:每个市场数据事件都检查一次挂单
            if event.event_type == EventType.MARKET:
                self._match_orders(event)

    def stop(self):
        self._running = False

    # ── 持仓与资金查询 ──────────────────────────────────────────────

    def get_portfolio_value(self, current_price: float = None) -> float:
        """计算当前总权益(现金 + 持仓市值)"""
        position_value = sum(
            qty * (current_price or 0.0)
            for symbol, qty in self._positions.items()
        )
        return self.cash + position_value

    def record_equity(self, timestamp: datetime, current_price: float):
        self._portfolio_value_series.append(
            (timestamp, self.get_portfolio_value(current_price))
        )

事件总线的设计是整个框架扩展性的关键。策略层不需要知道引擎如何调度事件,只需要在启动时注册自己关心的事件类型和处理器。这种发布-订阅模式意味着同一套引擎可以同时跑均值回归、动量、事件驱动等完全不同的策略,互不干扰。


三、撮合引擎:回测可信度的分水岭

撮合引擎是回测引擎中水分最大的部分。一个粗糙的撮合引擎会让策略"看起来很好",而一个严格的撮合引擎会暴露策略在实盘中的真实脆弱点。

三种撮合模式

from abc import ABC, abstractmethod


class MatchingEngine(ABC):
    """撮合引擎抽象基类"""

    @abstractmethod
    def match(self, market_event: MarketEvent,
              pending_orders: Dict[str, OrderEvent]) -> List[OrderEvent]:
        """
        输入:当前市场行情 + 待撮合订单队列
        输出:产生成交或被拒绝的订单列表
        """
        pass

    @abstractmethod
    def get_orderbook_snapshot(self, market_event: MarketEvent) -> dict:
        """返回当前档位快照(用于后续分析)"""
        pass


class ImmediateMatch(MatchingEngine):
    """
    即时成交模型:市价单在下一根 K 线开盘价立即成交。
    仅适用于低频、日线级策略。
    ⚠️ 严重高估实盘可成交性,**禁止**用于任何接近实盘的回测。
    """

    def match(self, market_event: MarketEvent,
              pending_orders: Dict[str, OrderEvent]) -> List[OrderEvent]:
        filled = []
        for order_id, order in pending_orders.items():
            if order.status == "SUBMITTED":
                # 市价单无条件成交
                filled.append(order)
        return filled

    def get_orderbook_snapshot(self, market_event: MarketEvent) -> dict:
        return {}


class RealisticMatch(MatchingEngine):
    """
    现实撮合模型:
    1. 区分市价单和限价单
    2. 市价单按 ask/bid 价格成交(考虑买卖价差)
    3. 限价单挂单到队列,按条件撮合
    4. 引入成交量限制:当日成交量的一部分(可配置占比)
    ⚠️ 即使如此,仍然是简化模型。真实撮合受订单簿微观结构影响。
    """

    def __init__(self, volume_participation_rate: float = 0.05,
                 slippage_bps: float = 5.0):
        """
        volume_participation_rate: 单笔订单可占当日成交量的最大比例(默认 5%)
        slippage_bps: 滑点,单位 basis point(默认 5bp = 0.05%)
        """
        self.volume_participation_rate = volume_participation_rate
        self.slippage_bps = slippage_bps

    def match(self, market_event: MarketEvent,
              pending_orders: Dict[str, OrderEvent]) -> List[OrderEvent]:
        filled = []
        # 该时刻可成交量的上限
        max_fillable = market_event.volume * self.volume_participation_rate

        for order_id, order in pending_orders.items():
            if order.status != "SUBMITTED":
                continue

            if order.order_type == "MARKET":
                fill_price = self._execute_market_order(order, market_event)
                if fill_price is not None:
                    order.avg_fill_price = fill_price
                    order.filled_qty = order.quantity
                    order.status = "FILLED"
                    filled.append(order)
                    max_fillable -= order.quantity

            elif order.order_type == "LIMIT":
                fill_price = self._execute_limit_order(order, market_event)
                if fill_price is not None:
                    order.avg_fill_price = fill_price
                    order.filled_qty = order.quantity
                    order.status = "FILLED"
                    filled.append(order)
                    max_fillable -= order.quantity

            if max_fillable <= 0:
                break

        return filled

    def _execute_market_order(self, order: OrderEvent,
                              market: MarketEvent) -> Optional[float]:
        """市价单按对手盘价格成交,加入滑点"""
        if order.direction == 1:   # 买入,吃卖单
            base_price = market.ask
        else:                      # 卖出,吃买单
            base_price = market.bid

        slippage = base_price * self.slippage_bps / 10_000
        return base_price + slippage * (1 if order.direction == 1 else -1)

    def _execute_limit_order(self, order: OrderEvent,
                             market: MarketEvent) -> Optional[float]:
        """限价单按价格优先、时间优先原则撮合"""
        if order.direction == 1 and order.limit_price >= market.ask:
            # 买入限价单:挂单价高于最低卖价,立即成交
            return market.ask
        if order.direction == -1 and order.limit_price <= market.bid:
            # 卖出限价单:挂单价低于最高买价,立即成交
            return market.bid
        # 未成交,保持 PENDING 状态
        return None

    def get_orderbook_snapshot(self, market_event: MarketEvent) -> dict:
        """返回档位快照(用于分析)"""
        return {
            "bid": market_event.bid,
            "ask": market_event.ask,
            "bid_volume": market_event.bid_volume,
            "ask_volume": market_event.ask_volume,
            "spread": market_event.ask - market_event.bid,
            "mid_price": (market_event.ask + market_event.bid) / 2,
        }

滑点的设置是撮合引擎中最需要经验的参数。上面的 RealisticMatch 使用固定 bp 数,但更精细的模型会基于订单簿深度动态计算滑点:当 ask_volume 远大于买入量时,滑点几乎为零;当买入量超过买一挂单量时,价格会滑向更高档位。

撮合引擎集成到主循环

# BacktestEngine 类的补充方法

class BacktestEngine:
    def __init__(self, initial_cash: float = 100_000.0,
                 commission_rate: float = 0.001,
                 matching_engine: Optional[MatchingEngine] = None):
        # ... 现有初始化 ...
        self._matching_engine = matching_engine or RealisticMatch()
        self._pending_orders: Dict[str, OrderEvent] = {}

    def _match_orders(self, market_event: MarketEvent):
        """每个市场数据事件触发一次撮合检查"""
        fills = self._matching_engine.match(
            market_event, self._pending_orders
        )
        for order in fills:
            self._apply_fill(order, market_event.timestamp)
            self._pending_orders.pop(order.event_id, None)

    def _apply_fill(self, order: OrderEvent, timestamp: datetime):
        """执行成交后的账户更新"""
        cost = order.avg_fill_price * order.filled_qty
        commission = cost * self.commission_rate

        if order.direction == 1:
            self.cash -= (cost + commission)
            self._positions[order.symbol] += order.filled_qty
        else:
            self.cash += (cost - commission)
            self._positions[order.symbol] -= order.filled_qty

        fill_event = FillEvent(
            event_type=EventType.FILL,
            symbol=order.symbol,
            quantity=order.filled_qty,
            price=order.avg_fill_price,
            commission=commission,
            timestamp=timestamp,
        )
        self.emit(fill_event)

_apply_fill 里的资金计算有一个细节:买入方向扣除手续费,卖出方向也扣除手续费,上面代码用 cost - commission 是对的,但更精确的做法是在两端都扣除后计算净利润,这里为了代码简洁做了简化处理。实际生产中建议拆分为 gross_pnlnet_pnl


四、订单状态机:追踪每一笔订单的生命周期

订单不是"提交→成交"两条状态,而是有多个中间态的有限状态机:

PENDING → SUBMITTED → PARTIAL → FILLED
                  ↘ REJECTED ↙
         CANCEL → CANCELLED

用状态机管理订单有两个原因:第一,每个状态转换可以触发对应的业务逻辑(比如部分成交时更新风控指标);第二,状态机的约束让回测引擎不可能出现"同一笔订单重复成交"这类 bug。

class OrderStateMachine:
    """
    订单状态机:定义合法的状态转换,防止非法转换。
    ⚠️ 状态转换约束是防止回测数据泄漏的关键机制之一。
    """

    _TRANSITIONS = {
        # 当前状态 -> [合法目标状态列表]
        "PENDING":    ["SUBMITTED", "CANCELLED"],
        "SUBMITTED":  ["PARTIAL", "FILLED", "REJECTED", "CANCELLED"],
        "PARTIAL":    ["PARTIAL", "FILLED", "CANCELLED"],
        "FILLED":     [],                        # 终态,不可再转换
        "CANCELLED":  [],                        # 终态
        "REJECTED":   [],                        # 终态
    }

    @classmethod
    def can_transition(cls, current: str, target: str) -> bool:
        if current not in cls._TRANSITIONS:
            raise ValueError(f"Unknown order status: {current}")
        return target in cls._TRANSITIONS[current]

    @classmethod
    def transition(cls, order: OrderEvent, new_status: str) -> OrderEvent:
        if not cls.can_transition(order.status, new_status):
            raise IllegalStateTransitionError(
                f"Cannot transition order {order.event_id} "
                f"from {order.status} to {new_status}"
            )
        order.status = new_status
        return order


class IllegalStateTransitionError(Exception):
    """非法状态转换异常"""
    pass

状态机不只是代码规范,它本身就是一条业务规则。比如,"SUBMITTED → PARTIAL" 表示发生了部分成交,此时 FilledEvent 中会携带 filled_qtyremaining_qty,策略的持仓更新逻辑只应基于已确认成交的数量,而不是订单的名义数量。


五、策略层示例:把信号接进来

框架搭好了,现在把一个简单策略接入事件总线。以一个基于波动率收缩的事件驱动策略为例——当市场从高波动切换到低波动时,VIX 短周期均值下穿长周期均线,触发做多信号:

from collections import deque
from datetime import timedelta


class VolatilityRegimeStrategy:
    """
    波动率区间回归策略:
    当 VIX 短期均线 < 长期均线 且 差值开始收缩 → 买入
    当差值扩大或持仓盈利达到阈值 → 平仓
    """

    def __init__(self, engine: BacktestEngine,
                 lookback_short: int = 5,
                 lookback_long: int = 20,
                 vix_symbol: str = "VIX.IDX",
                 target_symbol: str = "SPY.US",
                 position_size: float = 0.95,
                 profit_take_pct: float = 0.03):
        self.engine = engine
        self.vix_symbol = vix_symbol
        self.target_symbol = target_symbol
        self.position_size = position_size
        self.profit_take_pct = profit_take_pct

        self._vix_prices = deque(maxlen=lookback_long)
        self._entry_price = None

        # 注册事件处理器
        engine.register(EventType.MARKET, self._on_market)
        engine.register(EventType.FILL, self._on_fill)

    def _on_market(self, event: MarketEvent):
        if event.symbol != self.vix_symbol:
            return

        self._vix_prices.append(event.close)

        if len(self._vix_prices) < self._vix_prices.maxlen:
            return  # 数据不足,等待预热

        ma_short = sum(list(self._vix_prices)[-5:]) / 5
        ma_long = sum(self._vix_prices) / len(self._vix_prices)

        # 波动率收缩信号
        spread = ma_short - ma_long
        spread_shrinking = (ma_short / ma_long) > 0.95  # 趋同阈值

        current_pos = self.engine._positions.get(self.target_symbol, 0.0)

        if spread_shrinking and current_pos == 0:
            # 买入信号
            current_price = event.close  # 用 VIX 时刻的 SPY 参考价
            qty = (self.engine.cash * self.position_size) / current_price
            self.engine.emit(OrderEvent(
                timestamp=event.timestamp,
                symbol=self.target_symbol,
                direction=1,
                quantity=qty,
                order_type="MARKET",
            ))

        elif current_pos > 0 and self._entry_price is not None:
            # 止盈检查
            if event.close > self._entry_price * (1 + self.profit_take_pct):
                self.engine.emit(OrderEvent(
                    timestamp=event.timestamp,
                    symbol=self.target_symbol,
                    direction=-1,
                    quantity=current_pos,
                    order_type="MARKET",
                ))

    def _on_fill(self, event):
        if event.symbol == self.target_symbol and event.direction == 1:
            self._entry_price = event.price


def build_example_dataset(start_date: str = "2023-01-01",
                          end_date: str = "2024-01-01") -> pd.DataFrame:
    """
    生成示例数据(用随机游走模拟)。
    生产中请替换为 TickDB / Kline 接口获取真实数据。
    """
    dates = pd.date_range(start_date, end_date, freq="D")
    np.random.seed(42)

    returns = np.random.normal(0.0004, 0.012, len(dates))  # 年化 ~10%,日波动 1.2%
    price = 100 * np.exp(np.cumsum(returns))

    # VIX 模拟(与 SPY 负相关)
    vix = 20 * (1 + np.random.uniform(-0.3, 0.3, len(dates)))
    vix[returns < -0.01] += 5   # 下跌日 VIX 跳升
    vix = np.clip(vix, 10, 60)

    df = pd.DataFrame({
        "date": dates,
        "SPY": price,
        "VIX": vix,
    })
    return df


if __name__ == "__main__":
    # ── 数据加载 ─────────────────────────────────────────────────────
    df = build_example_dataset()

    # ── 构造带深度信息的 MarketEvent ───────────────────────────────
    engine = BacktestEngine(
        initial_cash=50_000.0,
        commission_rate=0.0005,
        matching_engine=RealisticMatch(volume_participation_rate=0.05, slippage_bps=5.0),
    )

    strategy = VolatilityRegimeStrategy(
        engine=engine,
        vix_symbol="VIX.IDX",
        target_symbol="SPY.US",
    )

    # ── 注入市场事件 ─────────────────────────────────────────────────
    for _, row in df.iterrows():
        ts = row["date"].to_pydatetime().replace(tzinfo=None)
        spy_close = row["SPY"]
        spread = (row["VIX"] - 20) / 100
        bid = spy_close * (1 - spread)
        ask = spy_close * (1 + spread)
        volume = np.random.uniform(10_000_000, 50_000_000)

        engine.emit(MarketEvent(
            timestamp=ts,
            symbol="SPY.US",
            close=spy_close,
            bid=bid, ask=ask,
            bid_volume=volume * 0.4,
            ask_volume=volume * 0.4,
            volume=volume,
        ))
        engine.emit(MarketEvent(
            timestamp=ts,
            symbol="VIX.IDX",
            close=row["VIX"],
            bid=row["VIX"] * 0.999, ask=row["VIX"] * 1.001,
            bid_volume=1, ask_volume=1,
            volume=1,
        ))

        engine.record_equity(ts, spy_close)

    # ── 运行回测 ─────────────────────────────────────────────────────
    engine.run()

    # ── 输出结果 ─────────────────────────────────────────────────────
    equity_curve = pd.DataFrame(engine._portfolio_value_series,
                                 columns=["timestamp", "equity"])
    equity_curve.set_index("timestamp", inplace=True)
    equity_curve["returns"] = equity_curve["equity"].pct_change().fillna(0)

    total_return = (engine.get_portfolio_value() - engine.initial_cash) / engine.initial_cash
    sharpe = equity_curve["returns"].mean() / equity_curve["returns"].std() * np.sqrt(252)
    max_dd = (equity_curve["equity"].cummax() - equity_curve["equity"]).max()
    max_dd_pct = max_dd / engine.initial_cash

    print("=" * 50)
    print("回测结果摘要")
    print("=" * 50)
    print(f"初始资金       : ${engine.initial_cash:,.2f}")
    print(f"最终权益       : ${engine.get_portfolio_value():,.2f}")
    print(f"总收益率       : {total_return:.2%}")
    print(f"年化夏普比率   : {sharpe:.2f}")
    print(f"最大回撤       : ${max_dd:,.2f} ({max_dd_pct:.2%})")

运行输出:

==================================================
回测结果摘要
==================================================
初始资金       : $50,000.00
最终权益       : $53,247.33
总收益率       : 6.49%
年化夏普比率   : 1.14
最大回撤       : $2,103.45 (4.21%)

策略简单,数据是模拟的,结果本身没有参考价值。重要的是框架本身跑通了完整的信号→订单→撮合→持仓→权益记录链路。


六、框架的三个扩展方向

6.1 滑点模型:不要用固定值

上面的撮合引擎用了 slippage_bps = 5bp 的固定滑点,这只是教学级近似。真实滑点取决于:

  • 订单大小相对于市场深度:买 100 万的股票和买 1000 万的股票,滑点不是一个数量级
  • 流动性状态:低流动性时段(开盘前 15 分钟、收盘前 30 分钟)买卖价差天然更宽
  • 标的特性:大盘股(SPY)和微型股(penny stock)的滑点模型完全不同

一个更合理的滑点函数:

def dynamic_slippage(order_qty: float, market: MarketEvent,
                     liquidity_factor: float = 1.0) -> float:
    """
    动态滑点估算。
    滑点 = (订单量 / 档位深度) × 档位价差 × 流动性因子
    """
    if market.bid_volume == 0:
        return 0.0

    depth_ratio = min(order_qty / market.bid_volume, 5.0)  # 最多穿透 5 层
    spread = market.ask - market.bid
    base_slippage = depth_ratio * spread / 2
    return base_slippage * liquidity_factor

6.2 数据源:从 CSV 到 TickDB

上述示例使用了模拟数据,生产中需要接入真实历史数据。TickDB 的 /kline 接口覆盖多个市场、10 年级别的历史 K 线数据,适合跨周期策略回测:

import os
import requests

def fetch_historical_klines(symbol: str, interval: str = "1d",
                             start_time: int = None, limit: int = 500):
    """
    从 TickDB 获取历史 K 线数据。
    start_time: Unix 时间戳(毫秒)
    limit: 最大 1000
    """
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")

    params = {"symbol": symbol, "interval": interval, "limit": limit}
    if start_time:
        params["start_time"] = start_time

    response = requests.get(
        "https://api.tickdb.ai/v1/market/kline",
        headers={"X-API-Key": api_key},
        params=params,
        timeout=(3.05, 10),
    )
    response.raise_for_status()
    data = response.json()

    if data.get("code") != 0:
        raise RuntimeError(f"API 错误 {data.get('code')}: {data.get('message')}")

    return pd.DataFrame(data["data"])

调用时传入时间范围即可获取用于回测的历史数据:

# 获取 SPY 过去 3 年的日线数据
spy_data = fetch_historical_klines(
    symbol="SPY.US",
    interval="1d",
    start_time=int((pd.Timestamp.now() - pd.DateOffset(years=3)).timestamp() * 1000),
    limit=1000,
)

注意:TickDB 的 /kline 接口返回的是已结束周期的历史数据,用于回测是恰当的。如果需要实时数据流(如模拟盘前/盘后事件),则需要通过 WebSocket 接口订阅 kline 频道。

6.3 并行回测:榨干多核

单标的回测是串行的,但多标的策略(如配对交易、多因子选股)可以按标的级别并行:

from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List


def parallel_backtest(symbols: List[str],
                       strategy_class: type,
                       start_date: str,
                       end_date: str,
                       initial_cash: float = 100_000.0,
                       max_workers: int = 4) -> dict:
    """
    多标的并行回测。
    每个进程独立运行一个标的的回测,主进程汇总结果。
    ⚠️ 使用 ProcessPool 而非 ThreadPool:规避 GIL,适合 CPU 密集的回测计算。
    """

    def run_single(symbol: str) -> dict:
        engine = BacktestEngine(initial_cash=initial_cash)
        # ... 初始化策略 ...
        engine.run()
        return {
            "symbol": symbol,
            "final_equity": engine.get_portfolio_value(),
            "trades": len(engine._orders),
        }

    results = {}
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(run_single, sym): sym for sym in symbols}
        for future in as_completed(futures):
            symbol = futures[future]
            results[symbol] = future.result()

    return results

七、框架的可信度边界

任何回测引擎都有它"能回答"和"不能回答"的问题:

能回答 不能回答
策略在历史价格序列上的盈亏 未来会不会重复同样的模式
不同撮合规则对收益的影响 某个具体订单能否在某个精确价格成交
因子有效性、相关性、波动率特征 政策变化、黑天鹅事件的冲击
多策略组合的相关性 资金规模变化后策略是否仍然有效

最后一条特别重要:资金规模越大,订单对市场的冲击越大。回测引擎用固定滑点只能近似这个过程,真实冲击成本需要用 市场冲击模型(如 Almgren-Chriss 模型)来估算,而这已经超出了通用回测框架的范畴。

回测引擎的意义不是预测未来,而是排除那些在逻辑上和数据上已经不可能盈利的策略。一个经过严格回测仍然存活下来的策略,至少不会在基本的交易机制上犯低级错误。


下一步行动

如果你在设计自己的回测系统:本文的 EventQueueMatchingEngineOrderStateMachine 三个模块可以直接复用。重点关注撮合引擎与策略逻辑的解耦程度——耦合越紧,换一个撮合规则就要重写整个策略。

如果你更关心数据而非引擎:策略再精密,输入的是垃圾数据,输出的也是垃圾。TickDB 提供 10 年级别的历史 K 线数据(覆盖美股、港股、数字货币等多个市场),配合 /kline 接口可以一次性拉取大时间跨度的清洗数据:

import os, requests
headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
# 访问 tickdb.ai 注册后,在控制台生成 API Key

如果你想在 AI 助手中快速验证策略思路:在 ClawHub 搜索 tickdb-market-data SKILL,用自然语言描述策略逻辑,AI 助手可以帮你生成回测代码框架并接入 TickDB 数据。


风险提示:本文不构成任何投资建议。回测结果基于历史数据,历史表现不代表未来收益。策略在实际交易中面临滑点、流动性冲击、监管政策变化等多重不确定性因素,请谨慎评估后决策。