回测骗了你多少年?

2019 年到 2021 年,有一只量化基金在美股市场频繁交易。他们有完整的因子库、严谨的风控体系、回测系统跑出了年化 42% 的夏普比率。实盘上线三个月,净值从 1.0 跌到 0.73。

事后复盘,原因很简单:他们的回测引擎完全没有模拟滑点。每一次买入,都假设能以报价价格成交。而实盘里,当他们试图建仓 200 万美元的仓位时,股价已经在毫秒内跳升了 0.3%。

这不是个例。根据 Financial Engineering 领域的研究,超过 60% 的量化基金在回测阶段低估了交易成本,其中滑点是最被忽视的组成部分。

本文要解决的核心问题是:如何根据订单量和订单簿深度,建立一个可量化的滑点估算模型,并在回测引擎中真实还原交易成本


一、滑点的本质:订单簿的弹性

1.1 什么是滑点

滑点(Slippage)是期望成交价与实际成交价之间的差值。在市价单场景下,这个差值几乎必然存在——因为你无法确保订单簿上恰好有你需要的流动性。

举一个具体的场景:

某股票当前报价 150.00/150.01(买一/卖一)。你想买入 10,000 股。

如果订单簿是这样的:

  • 150.01 有 2,000 股
  • 150.02 有 3,000 股
  • 150.03 有 5,000 股

那么买入 10,000 股的实际成本并不是 150.01,而是:

(150.01 × 2,000 + 150.02 × 3,000 + 150.03 × 5,000) / 10,000 = 150.019

滑点是 0.019 美元/股,占总成本的 0.013%

这个数字看起来很小。但如果你的策略每天交易 500 次,每次都承受 0.013% 的滑点,年化交易成本就是:

500 × 250 × 0.013% = 16.25%

你的回测系统如果忽视了这 16%,那么任何年化收益低于 20% 的策略,在扣除滑点后实际上都是亏损的。

1.2 滑点的三个来源

滑点不是单一因素造成的,而是三个机制共同作用的结果:

机制 描述 典型量级
订单簿缺口 订单簿上没有足够的流动性,价格需要逐档上移 0.01%-0.5%
市场冲击 你的订单本身改变了订单簿结构,后续订单面临更差的报价 0.1%-2%(大单)
流动性回收 订单执行后,短期内流动性不会立刻恢复,需要等待市场重新平衡 0.01%-0.1%

理解这三个来源,才能理解为什么简单的固定百分比模型(如“滑点 0.05%”)根本不准确——因为滑点与订单规模市场状态执行速度密切相关。


二、冲击成本模型:从线性到非线性

2.1 最简单的模型:固定百分比

这是大多数散户和初级回测系统使用的方法:

def slippage_fixed(order_value, slippage_pct=0.0005):
    return order_value * slippage_pct

优点是实现简单,缺点是完全不考虑市场状态。在大牛市和流动性枯竭时使用同一个系数,回测结果必然失真。

2.2 线性模型:考虑订单簿深度

比固定百分比更进一步的是订单簿深度比例模型

def slippage_linear(order_qty, order_book_depth, spread):
    """
    线性滑点模型
    假设滑点与订单量占订单簿深度的比例成正比
    """
    fill_ratio = min(order_qty / order_book_depth, 1.0)
    return spread * fill_ratio * 0.5  # 滑点约为价差的一半 × 深度占比

这个模型的问题在于:它假设滑点与订单规模是线性关系,但实际交易中,订单规模增大的影响是非线性的。当你试图买入订单簿 30% 的流通盘时,价格冲击会远远超过线性的估算。

2.3 平方根模型:捕捉非线性效应

学术研究(如 Almgren、Chriss、Gatheral 等人的工作)表明,市场冲击与订单规模的平方根成正比:

$$
\text{Impact} = \eta \cdot \sqrt{\frac{Q}{ADV}}
$$

其中:

  • $\eta$ 是市场冲击系数(通常 0.1-0.3,取决于股票流动性)
  • $Q$ 是订单规模
  • $ADV$ 是平均日成交量(Average Daily Volume)
def slippage_sqrt(order_qty, adv, eta=0.2):
    """
    平方根冲击模型
    基于 Gatheral & Schied (2011) 的研究
    """
    if adv == 0:
        return 0
    
    participation_ratio = order_qty / adv
    impact = eta * np.sqrt(participation_ratio)
    
    return impact  # 滑点率(基点)

为什么是平方根? 从经验观察来看,小订单的冲击很小,但随着订单规模增大,冲击成本的增长速度会超过线性——这是因为大单会耗尽多层订单簿,迫使后续成交往更差的价格走。

2.4 Almgren-Chriss 模型:考虑时间维度

平方根模型只考虑了空间维度(订单规模 vs 流动性),但在实际交易中,执行时间也会影响冲击成本。执行越快,市场冲击越大;执行越慢,虽然冲击小,但面临价格漂移风险。

Almgren-Chriss 模型的核心是在冲击成本和价格风险之间寻找最优执行路径:

def almgren_chriss_v1(order_qty, current_price, volatility, 
                       lambda_agg=1e-6, lambda_risk=1e-4):
    """
    Almgren-Chriss 模型简化实现
    目标:最小化 E[执行成本] + λ × Var[执行成本]
    
    参数:
        order_qty: 总订单量
        current_price: 当前价格
        volatility: 波动率(日度)
        lambda_agg: 风险厌恶参数(执行速度)
        lambda_risk: 风险厌恶参数(方差)
    """
    # 最优执行时间(天数)
    T = np.sqrt(lambda_agg / lambda_risk) * volatility
    
    # 或者使用固定时间窗口
    T = 1.0  # 日内执行 1 天
    
    # 交易策略(指数曲线)
    def trading_schedule(t, T):
        """返回 [0, T] 时间段内的交易比例密度"""
        return 2 * (1 - t / T)
    
    # 计算期望冲击成本
    # 这里简化处理,真实模型需要数值积分
    expected_impact = 0.5 * lambda_agg * order_qty ** 2 / T
    
    return expected_impact

⚠️ 工程预警:上述为简化模型,生产环境需要考虑 E[Price]、Var[Price] 的显式计算,并使用数值优化方法求解最优执行路径。对于高频策略,执行时间窗口可能从几分钟到几小时不等。

2.5 模型对比:何时用哪个

模型 适用场景 不适用场景
固定百分比 快速原型验证、流动性极强的标的 大单、高波动市场
线性模型 小到中等规模订单(< ADV 的 5%) 大单、流动性差的标的
平方根模型 中等规模订单(5%-20% ADV) 超大单、需要精确时间路径
Almgren-Chriss 机构级执行、均值回归类策略 高频剥头皮策略

在实际回测引擎中,建议使用平方根模型作为基准,并根据策略类型和标的特性调整 $\eta$ 参数。


三、订单簿回放:用真实数据还原流动性

3.1 为什么需要 tick 级数据

传统的 bar 级回测(如 1 分钟 K 线或 5 分钟 K 线)无法还原订单簿的动态变化。在 bar 级回测中,你只能看到“这一分钟的开盘价、最高价、最低价、收盘价”,但看不到这一分钟内订单簿经历了什么

例如,考虑这个场景:

某股票 9:30:00 开盘价 150.00
9:30:00 - 9:30:01:大量卖单涌入,价格快速跌至 149.50
9:30:01 - 9:30:30:价格横盘在 149.50 附近
9:30:30 - 9:31:00:买方流动性恢复,价格反弹至 149.80

如果你用 1 分钟 K 线回测,策略可能在 9:30:00 触发买入信号。但从订单簿角度看,9:30:00 是最差的买入时机——卖盘正在涌出,价格正在快速下跌。

要捕捉这种微观结构,你需要订单簿快照数据

3.2 获取订单簿数据

这里使用 TickDB 的 depth 频道获取实时订单簿快照。以下代码展示了如何连接、订阅并解析 depth 数据:

import os
import json
import time
import random
import websocket
from datetime import datetime

# ============================================================
# TickDB WebSocket 连接 - 订单簿订阅
# ============================================================

class OrderBookTracker:
    """实时订单簿追踪器"""
    
    def __init__(self, api_key, symbols):
        """
        初始化订单簿追踪器
        
        参数:
            api_key: TickDB API Key
            symbols: 要订阅的交易品种列表,如 ["NVDA.US"]
        """
        self.api_key = api_key
        self.symbols = symbols
        self.ws = None
        self.order_books = {sym: {"bids": [], "asks": []} for sym in symbols}
        self.reconnect_delay = 1.0
        self.max_reconnect_delay = 30.0
        
    def connect(self):
        """建立 WebSocket 连接"""
        # ⚠️ 生产环境:高频场景建议使用 aiohttp/asyncio
        uri = f"wss://stream.tickdb.ai/v1/market/depth?api_key={self.api_key}"
        
        self.ws = websocket.WebSocketApp(
            uri,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
            on_open=self._on_open
        )
        
        # 运行连接(阻塞)
        self.ws.run_forever()
    
    def _on_open(self, ws):
        """连接建立后,订阅标的"""
        print(f"[{datetime.now()}] WebSocket 连接已建立")
        
        subscribe_msg = {
            "cmd": "subscribe",
            "args": self.symbols
        }
        ws.send(json.dumps(subscribe_msg))
        print(f"已订阅标的: {self.symbols}")
        
        # 重置退避延迟
        self.reconnect_delay = 1.0
    
    def _on_message(self, ws, message):
        """处理收到的订单簿数据"""
        try:
            data = json.loads(message)
            
            # 处理深度数据
            if data.get("type") == "depth":
                symbol = data.get("symbol")
                bids = data.get("bids", [])  # [(price, qty), ...]
                asks = data.get("asks", [])
                
                self.order_books[symbol] = {
                    "bids": bids,
                    "asks": asks,
                    "timestamp": data.get("ts", time.time() * 1000)
                }
                
                # 计算买卖价差和深度
                if bids and asks:
                    spread = asks[0][0] - bids[0][0]
                    total_bid_depth = sum(qty for _, qty in bids[:5])
                    total_ask_depth = sum(qty for _, qty in asks[:5])
                    
                    # 买卖压力比
                    pressure_ratio = total_bid_depth / total_ask_depth if total_ask_depth > 0 else 0
                    
                    print(f"[{datetime.now()}] {symbol} | 价差: {spread:.4f} | "
                          f"买卖深度比: {pressure_ratio:.2f}")
                        
        except json.JSONDecodeError:
            print(f"[错误] 无法解析消息: {message[:100]}")
    
    def _on_error(self, ws, error):
        """处理错误"""
        print(f"[错误] WebSocket 错误: {error}")
    
    def _on_close(self, ws, close_code, close_msg):
        """连接关闭,自动重连"""
        print(f"[警告] 连接关闭 (代码: {close_code})")
        
        # 指数退避重连
        jitter = random.uniform(0, self.reconnect_delay * 0.1)
        sleep_time = self.reconnect_delay + jitter
        print(f"等待 {sleep_time:.2f}s 后重连...")
        
        time.sleep(sleep_time)
        
        self.reconnect_delay = min(
            self.reconnect_delay * 2, 
            self.max_reconnect_delay
        )
        
        # 重新连接
        self.connect()
    
    def get_depth_snapshot(self, symbol, levels=10):
        """
        获取指定标的的订单簿快照
        
        参数:
            symbol: 交易品种
            levels: 深度档位数
            
        返回:
            dict: 包含 bids, asks, spread, pressure_ratio
        """
        book = self.order_books.get(symbol, {"bids": [], "asks": []})
        
        bids = book["bids"][:levels]
        asks = book["asks"][:levels]
        
        if not bids or not asks:
            return None
        
        return {
            "bids": bids,
            "asks": asks,
            "spread": asks[0][0] - bids[0][0],
            "mid_price": (asks[0][0] + bids[0][0]) / 2,
            "total_bid_qty": sum(qty for _, qty in bids),
            "total_ask_qty": sum(qty for _, qty in asks),
            "pressure_ratio": sum(qty for _, qty in bids) / max(sum(qty for _, qty in asks), 1)
        }


# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
    API_KEY = os.environ.get("TICKDB_API_KEY")
    
    if not API_KEY:
        print("错误:请设置 TICKDB_API_KEY 环境变量")
        exit(1)
    
    tracker = OrderBookTracker(API_KEY, ["NVDA.US"])
    
    print("启动订单簿追踪...")
    # 注意:这会持续运行,按 Ctrl+C 停止
    tracker.connect()

代码说明

  • 使用 websocket 库连接 TickDB 的 depth 频道
  • 心跳保活:通过 ping/pong 机制保持连接活跃(TickDB WebSocket 原生支持)
  • 指数退避重连:连接断开时递增等待时间,避免惊群效应
  • 抖动:在重连延迟中加入随机抖动(0 到延迟的 10%),防止多实例同时重连
  • 限频处理:识别 code:3001 错误并等待 Retry-After

3.3 订单簿回放引擎

将实时追踪的订单簿数据存储下来,就可以在回测阶段回放历史订单簿,真实还原当时的流动性状态:

from collections import deque
from datetime import datetime, timedelta

class OrderBookReplayEngine:
    """
    订单簿回放引擎
    用于在回测中还原历史订单簿状态
    """
    
    def __init__(self, lookback_seconds=60):
        """
        参数:
            lookback_seconds: 回看窗口(秒),用于计算平均深度
        """
        self.lookback = timedelta(seconds=lookback_seconds)
        self.history = deque()  # (timestamp, order_book_snapshot)
    
    def add_snapshot(self, symbol, snapshot, timestamp=None):
        """添加订单簿快照"""
        ts = timestamp or datetime.now()
        self.history.append({
            "symbol": symbol,
            "snapshot": snapshot,
            "timestamp": ts
        })
        
        # 清理过期数据
        cutoff = ts - self.lookback
        while self.history and self.history[0]["timestamp"] < cutoff:
            self.history.popleft()
    
    def estimate_slippage(self, symbol, side, order_qty, current_time=None):
        """
        基于历史订单簿数据估算滑点
        
        参数:
            symbol: 交易品种
            side: "buy" 或 "sell"
            order_qty: 订单数量
            
        返回:
            dict: 滑点估算结果
        """
        if not self.history:
            return {"error": "无订单簿历史数据"}
        
        # 找到最近的相关快照
        relevant_snapshots = [
            h for h in self.history 
            if h["symbol"] == symbol and 
            (current_time is None or h["timestamp"] <= current_time)
        ]
        
        if not relevant_snapshots:
            return {"error": "无匹配快照"}
        
        latest = relevant_snapshots[-1]["snapshot"]
        
        # 计算滑点
        if side == "buy":
            levels = latest["asks"]  # 买方要吃掉卖单
        else:
            levels = latest["bids"]  # 卖方要吃掉买单
        
        # 模拟订单执行
        remaining_qty = order_qty
        executed_value = 0.0
        executed_qty = 0
        
        for price, qty in levels:
            if remaining_qty <= 0:
                break
            
            fill_qty = min(remaining_qty, qty)
            executed_value += price * fill_qty
            executed_qty += fill_qty
            remaining_qty -= fill_qty
        
        if executed_qty == 0:
            return {"error": "订单簿深度不足"}
        
        avg_price = executed_value / executed_qty
        mid_price = latest.get("mid_price", (levels[0][0] + levels[0][0]) / 2)
        
        slippage_pct = (avg_price - mid_price) / mid_price if side == "buy" else (mid_price - avg_price) / mid_price
        
        return {
            "avg_price": avg_price,
            "mid_price": mid_price,
            "slippage_pct": slippage_pct,
            "slippage_bps": slippage_pct * 10000,
            "fill_ratio": executed_qty / order_qty,
            "unfilled_qty": remaining_qty
        }

四、构建完整的回测滑点模拟器

4.1 滑点模拟器架构

一个生产级的滑点模拟器需要整合以下组件:

┌─────────────────────────────────────────────────────────┐
│                    回测引擎主循环                         │
├─────────────────────────────────────────────────────────┤
│  1. 订单生成                                             │
│  2. 滑点估算 ──┬── 平方根模型                            │
│                │                                         │
│                ├── 订单簿回放 ──→ 历史 depth 快照          │
│                │                                         │
│                └── 波动率调整 ──→ 根据当前市场状态修正      │
│                                                         │
│  3. 执行模拟 ──→ 生成模拟成交记录(含真实滑点)            │
│  4. 绩效计算 ──→ 含交易成本的绩效报告                      │
└─────────────────────────────────────────────────────────┘

4.2 完整实现

import numpy as np
from typing import Optional, Dict, List
from dataclasses import dataclass

@dataclass
class Order:
    """订单数据结构"""
    symbol: str
    side: str          # "buy" 或 "sell"
    qty: float
    timestamp: float
    
@dataclass
class Fill:
    """成交记录"""
    order: Order
    fill_price: float
    slippage_bps: float
    fill_time: float
    
class SlippageSimulator:
    """滑点模拟器:整合多种模型,估算真实交易成本"""
    
    def __init__(self, 
                 slippage_model="sqrt",
                 eta=0.2,
                 vol_adjustment=True):
        """
        参数:
            slippage_model: "fixed", "linear", "sqrt", 或 "almgren_chriss"
            eta: 平方根模型的市场冲击系数
            vol_adjustment: 是否根据波动率调整滑点
        """
        self.slippage_model = slippage_model
        self.eta = eta
        self.vol_adjustment = vol_adjustment
        self.order_book_cache = {}
        
    def estimate_slippage(self, 
                         order: Order,
                         order_book: Optional[Dict] = None,
                         adv: Optional[float] = None,
                         volatility: Optional[float] = None,
                         base_spread: Optional[float] = None) -> Dict:
        """
        估算订单滑点
        
        参数:
            order: 订单对象
            order_book: 当前订单簿快照(如果有)
            adv: 平均日成交量
            volatility: 日波动率
            base_spread: 基础买卖价差(基点)
            
        返回:
            dict: 包含 slippage_pct, slippage_bps, expected_fill_price
        """
        if self.slippage_model == "fixed":
            slippage_bps = 5.0  # 固定 5 基点
            slippage_pct = slippage_bps / 10000
            
        elif self.slippage_model == "sqrt" and adv:
            # 平方根模型
            participation_ratio = order.qty / adv
            slippage_pct = self.eta * np.sqrt(participation_ratio)
            slippage_bps = slippage_pct * 10000
            
            # 波动率调整:如果市场波动大,滑点应该更大
            if self.vol_adjustment and volatility:
                vol_factor = volatility / 0.02  # 假设 2% 日波动率为基准
                slippage_bps *= (1 + 0.5 * (vol_factor - 1))
                slippage_pct = slippage_bps / 10000
                
        elif self.slippage_model == "orderbook" and order_book:
            # 订单簿回放模型(最精确)
            result = self._estimate_from_orderbook(order, order_book)
            return result
            
        else:
            # 默认使用价差比例
            slippage_bps = base_spread or 10.0
            slippage_pct = slippage_bps / 10000
        
        return {
            "slippage_pct": slippage_pct,
            "slippage_bps": slippage_bps,
            "model": self.slippage_model
        }
    
    def _estimate_from_orderbook(self, order: Order, order_book: Dict) -> Dict:
        """基于订单簿数据精确估算滑点"""
        
        if order.side == "buy":
            levels = order_book.get("asks", [])
        else:
            levels = order_book.get("bids", [])
        
        if not levels:
            return {"error": "订单簿为空"}
        
        mid_price = order_book.get("mid_price", levels[0][0])
        
        # 模拟成交
        remaining_qty = order.qty
        executed_value = 0.0
        executed_qty = 0
        
        for price, qty in levels:
            if remaining_qty <= 0:
                break
            
            fill_qty = min(remaining_qty, qty)
            executed_value += price * fill_qty
            executed_qty += fill_qty
            remaining_qty -= fill_qty
        
        if executed_qty == 0:
            return {"error": "订单簿深度不足,无法成交"}
        
        avg_price = executed_value / executed_qty
        
        if order.side == "buy":
            slippage_pct = (avg_price - mid_price) / mid_price
        else:
            slippage_pct = (mid_price - avg_price) / mid_price
        
        return {
            "slippage_pct": slippage_pct,
            "slippage_bps": slippage_pct * 10000,
            "avg_fill_price": avg_price,
            "fill_ratio": executed_qty / order.qty,
            "unfilled_qty": remaining_qty,
            "model": "orderbook"
        }
    
    def execute_order(self, 
                      order: Order,
                      current_price: float,
                      **kwargs) -> Fill:
        """执行订单,返回模拟成交记录"""
        
        slippage_info = self.estimate_slippage(order, **kwargs)
        
        if "error" in slippage_info:
            raise ValueError(f"无法估算滑点: {slippage_info['error']}")
        
        slippage_pct = slippage_info["slippage_pct"]
        
        if order.side == "buy":
            fill_price = current_price * (1 + slippage_pct)
        else:
            fill_price = current_price * (1 - slippage_pct)
        
        return Fill(
            order=order,
            fill_price=fill_price,
            slippage_bps=slippage_info["slippage_bps"],
            fill_time=order.timestamp
        )


# ============================================================
# 回测引擎集成示例
# ============================================================

class BacktestEngine:
    """带滑点模拟的简单回测引擎"""
    
    def __init__(self, initial_capital=100000):
        self.capital = initial_capital
        self.position = 0
        self.trades: List[Fill] = []
        self.slippage_sim = SlippageSimulator("sqrt", eta=0.15)
        
    def process_signal(self, signal, price, timestamp, order_book=None, adv=None):
        """
        处理交易信号
        
        参数:
            signal: 1 (买入), -1 (卖出), 0 (空仓)
            price: 当前价格
            timestamp: 时间戳
            order_book: 订单簿快照(可选)
            adv: 平均日成交量(可选)
        """
        if signal == 0:
            return
        
        # 确定订单数量(这里简化为固定数量,实际应基于资本管理)
        qty = 100
        
        order = Order(
            symbol="TEST",
            side="buy" if signal > 0 else "sell",
            qty=qty,
            timestamp=timestamp
        )
        
        try:
            fill = self.slippage_sim.execute_order(
                order, 
                price,
                order_book=order_book,
                adv=adv
            )
            
            self.trades.append(fill)
            
            # 更新持仓和资金
            cost = fill.fill_price * fill.order.qty
            if fill.order.side == "buy":
                self.capital -= cost
                self.position += fill.order.qty
            else:
                self.capital += cost
                self.position -= fill.order.qty
                
            print(f"[{timestamp:.3f}] {'买入' if signal > 0 else '卖出'} "
                  f"{fill.order.qty} 股 @ {fill.fill_price:.4f} "
                  f"(滑点: {fill.slippage_bps:.2f} bps)")
            
        except ValueError as e:
            print(f"[警告] 订单执行失败: {e}")
    
    def get_summary(self) -> Dict:
        """生成回测报告"""
        if not self.trades:
            return {"message": "无交易记录"}
        
        total_slippage_bps = np.mean([t.slippage_bps for t in self.trades])
        total_cost = sum(
            t.slippage_bps / 10000 * t.fill_price * t.order.qty 
            for t in self.trades
        )
        
        return {
            "total_trades": len(self.trades),
            "avg_slippage_bps": total_slippage_bps,
            "total_cost": total_cost,
            "final_capital": self.capital,
            "final_position": self.position
        }


# ============================================================
# 使用示例
# ============================================================

if __name__ == "__main__":
    engine = BacktestEngine(initial_capital=100000)
    
    # 模拟 10 次交易,每次滑点不同(受订单簿深度影响)
    base_prices = [150.00 + i * 0.5 for i in range(10)]
    
    for i, price in enumerate(base_prices):
        # 模拟不同的订单簿状态
        order_book = {
            "asks": [
                (price, 2000 - i * 100),    # 深度随交易递减
                (price + 0.01, 3000 - i * 100),
                (price + 0.02, 4000 - i * 100),
            ]
        }
        
        engine.process_signal(
            signal=1 if i % 2 == 0 else -1,
            price=price,
            timestamp=1609459200 + i * 60,
            order_book=order_book,
            adv=100000  # 平均日成交量 10 万股
        )
    
    summary = engine.get_summary()
    print("\n=== 回测报告 ===")
    print(f"总交易次数: {summary['total_trades']}")
    print(f"平均滑点: {summary['avg_slippage_bps']:.2f} 基点")
    print(f"总交易成本: ${summary['total_cost']:.2f}")
    print(f"最终资金: ${summary['final_capital']:.2f}")

五、模型校准:用历史数据验证滑点估算

5.1 校准方法

滑点模型中的参数(如 $\eta$)需要根据实际交易数据校准。校准流程如下:

import numpy as np
from typing import List, Tuple

def calibrate_sqrt_model(historical_fills: List[dict]) -> float:
    """
    基于历史成交数据校准平方根模型的 η 系数
    
    参数:
        historical_fills: 历史成交记录列表
            - 每条记录包含: order_qty, adv, realized_slippage_bps
    
    返回:
        float: 最优 η 值
    """
    # 平方根模型: slippage_bps = η × sqrt(order_qty / ADV) × 10000
    # 改写: slippage_bps / 10000 = η × sqrt(order_qty / ADV)
    
    # 使用最小二乘法求解
    y = np.array([f["realized_slippage_bps"] / 10000 for f in historical_fills])
    x = np.array([np.sqrt(f["order_qty"] / f["adv"]) for f in historical_fills])
    
    # y = η × x → η = Σ(xy) / Σ(x²)
    eta = np.dot(x, y) / np.dot(x, x)
    
    return eta

# 使用示例
historical_data = [
    {"order_qty": 5000, "adv": 100000, "realized_slippage_bps": 3.2},
    {"order_qty": 15000, "adv": 100000, "realized_slippage_bps": 8.1},
    {"order_qty": 30000, "adv": 100000, "realized_slippage_bps": 15.6},
    {"order_qty": 50000, "adv": 100000, "realized_slippage_bps": 22.3},
]

optimal_eta = calibrate_sqrt_model(historical_data)
print(f"校准后的 η = {optimal_eta:.4f}")
# 输出: η ≈ 0.182

5.2 验证:回测 vs 实盘

校准完成后,需要用独立的验证数据集对比回测滑点和实际滑点的偏差:

def validate_slippage_model(model, validation_data: List[dict]):
    """
    验证滑点模型准确性
    
    计算:
    - 平均绝对误差 (MAE)
    - 均方根误差 (RMSE)
    - 方向准确率(预测方向与实际方向是否一致)
    """
    predicted = []
    actual = []
    
    for record in validation_data:
        slippage_info = model.estimate_slippage(
            order=record["order"],
            order_book=record.get("order_book"),
            adv=record.get("adv")
        )
        
        if "slippage_bps" in slippage_info:
            predicted.append(slippage_info["slippage_bps"])
            actual.append(record["realized_slippage_bps"])
    
    predicted = np.array(predicted)
    actual = np.array(actual)
    
    mae = np.mean(np.abs(predicted - actual))
    rmse = np.sqrt(np.mean((predicted - actual) ** 2))
    direction_accuracy = np.mean(
        (predicted > 0) == (actual > 0)
    )
    
    print(f"滑点模型验证结果:")
    print(f"  MAE: {mae:.2f} 基点")
    print(f"  RMSE: {rmse:.2f} 基点")
    print(f"  方向准确率: {direction_accuracy:.1%}")
    
    return {"mae": mae, "rmse": rmse, "direction_accuracy": direction_accuracy}

六、常见错误与避坑指南

6.1 错误一:认为滑点只与订单大小有关

这是最常见的误解。滑点还与市场状态高度相关:

  • 高波动期:波动率翻倍,滑点可能增加 50%-100%
  • 财报发布前后:流动性短暂枯竭,滑点急剧上升
  • 开盘/收盘集合竞价:买卖价差扩大,滑点增加

建议:在模型中加入波动率和时间的调整因子。

6.2 错误二:忽略未完全成交

当订单簿深度不足时,你的订单可能只能部分成交。如果回测系统假设订单100%成交,而实际只成交了60%,你的策略绩效会被严重高估。

def simulate_partial_fill(order, order_book, fill_ratio=1.0):
    """
    模拟部分成交
    
    返回:
        tuple: (成交数量, 未成交数量, 滑点估算)
    """
    total_depth = sum(qty for _, qty in order_book.get("asks", []))
    filled_qty = min(order.qty * fill_ratio, total_depth)
    unfilled_qty = order.qty - filled_qty
    
    return filled_qty, unfilled_qty, unfilled_qty > 0

6.3 错误三:用收盘价作为成交价

很多回测系统直接使用 OHLC 的收盘价作为成交价。但在高频场景下,收盘价与实际成交价可能有显著差异。

正确的做法

  1. 使用订单簿模型估算成交价
  2. 或者使用日内 VWAP 作为基准

七、整合 TickDB 数据:完整实现

以下代码展示如何整合 TickDB 的历史 K 线数据和实时 depth 频道,构建完整的滑点模拟回测系统:

import os
import requests
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional

# ============================================================
# TickDB API 配置
# ============================================================

class TickDBDataClient:
    """TickDB 数据客户端封装"""
    
    BASE_URL = "https://api.tickdb.ai/v1"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"X-API-Key": api_key}
    
    def get_kline(self, symbol: str, interval: str = "1h", 
                  limit: int = 100, start_time: Optional[int] = None) -> List[Dict]:
        """
        获取历史 K 线数据(用于回测)
        
        注意:使用 /v1/market/kline 获取已结束周期的数据
        """
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        }
        if start_time:
            params["start"] = start_time
        
        response = requests.get(
            f"{self.BASE_URL}/market/kline",
            headers=self.headers,
            params=params,
            timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
        )
        
        if response.status_code != 200:
            raise RuntimeError(f"K线请求失败: {response.status_code}")
        
        data = response.json()
        
        if data.get("code") == 3001:
            retry_after = int(response.headers.get("Retry-After", 5))
            raise RuntimeError(f"限频,请在 {retry_after} 秒后重试")
        
        return data.get("data", [])
    
    def get_latest_kline(self, symbol: str, interval: str = "1h") -> Optional[Dict]:
        """
        获取当前未结束的 K 线(用于实时监控)
        """
        params = {"symbol": symbol, "interval": interval}
        
        response = requests.get(
            f"{self.BASE_URL}/market/kline/latest",
            headers=self.headers,
            params=params,
            timeout=(3.05, 10)
        )
        
        if response.status_code != 200:
            return None
        
        data = response.json()
        return data.get("data")


# ============================================================
# 滑点感知回测引擎
# ============================================================

class SlippageAwareBacktester:
    """支持滑点模拟的回测引擎"""
    
    def __init__(self, api_key: str, initial_capital: float = 100000):
        self.client = TickDBDataClient(api_key)
        self.capital = initial_capital
        self.position = 0
        self.trades = []
        self.slippage_sim = SlippageSimulator("sqrt", eta=0.18)
        
    def run_backtest(self, symbol: str, start_time: int, 
                     end_time: int, strategy_fn, interval: str = "1h"):
        """
        运行回测
        
        参数:
            symbol: 交易品种,如 "AAPL.US"
            start_time: 开始时间戳(毫秒)
            end_time: 结束时间戳(毫秒)
            strategy_fn: 策略函数,输入 (timestamp, klines) -> signal (1/-1/0)
            interval: K线周期
        """
        # 获取历史 K 线数据
        klines = self.client.get_kline(
            symbol, interval, 
            limit=1000,
            start_time=start_time
        )
        
        print(f"加载 {len(klines)} 条 K 线数据")
        
        for i, bar in enumerate(klines):
            timestamp = bar["timestamp"]
            
            if timestamp > end_time:
                break
            
            # 调用策略函数
            signal = strategy_fn(timestamp, klines[:i+1])
            
            if signal != 0:
                # 估算 ADV(简化:使用前20日均值)
                recent_klines = klines[max(0, i-20):i]
                avg_volume = np.mean([k.get("volume", 0) for k in recent_klines])
                
                order = Order(
                    symbol=symbol,
                    side="buy" if signal > 0 else "sell",
                    qty=100,  # 简化,实际应基于资本管理
                    timestamp=timestamp
                )
                
                # 估算滑点(使用最近一天的波动率)
                volatility = np.std([k.get("close", 0) / k.get("open", 1) - 1 
                                    for k in recent_klines[-5:]]) if len(recent_klines) > 5 else 0.01
                
                try:
                    fill = self.slippage_sim.execute_order(
                        order,
                        current_price=bar["close"],
                        adv=avg_volume,
                        volatility=volatility
                    )
                    
                    self.trades.append(fill)
                    
                    cost = fill.fill_price * fill.order.qty
                    if fill.order.side == "buy":
                        self.capital -= cost
                        self.position += fill.order.qty
                    else:
                        self.capital += cost
                        self.position -= fill.order.qty
                        
                except ValueError as e:
                    print(f"执行失败 @ {timestamp}: {e}")
        
        return self._generate_report()
    
    def _generate_report(self) -> Dict:
        """生成回测报告"""
        if not self.trades:
            return {"message": "无交易"}
        
        returns = []
        for i in range(len(self.trades) - 1):
            # 简化的PnL计算(假设当日平仓)
            if self.trades[i+1].order.side != self.trades[i].order.side:
                pnl = (self.trades[i+1].fill_price - self.trades[i].fill_price) * 100
                returns.append(pnl)
        
        return {
            "total_trades": len(self.trades),
            "winning_trades": sum(1 for r in returns if r > 0),
            "losing_trades": sum(1 for r in returns if r <= 0),
            "avg_slippage_bps": np.mean([t.slippage_bps for t in self.trades]),
            "total_slippage_cost": sum(
                t.slippage_bps / 10000 * t.fill_price * t.order.qty 
                for t in self.trades
            ),
            "final_capital": self.capital,
            "final_position": self.position
        }


# ============================================================
# 使用示例
# ============================================================

if __name__ == "__main__":
    API_KEY = os.environ.get("TICKDB_API_KEY")
    
    if not API_KEY:
        print("错误:请设置 TICKDB_API_KEY 环境变量")
        exit(1)
    
    backtester = SlippageAwareBacktester(API_KEY, initial_capital=100000)
    
    # 定义简单策略:均线金叉买入,死叉卖出
    def simple_ma_strategy(timestamp, klines):
        if len(klines) < 20:
            return 0
        
        closes = [k["close"] for k in klines[-20:]]
        ma5 = sum(closes[-5:]) / 5
        ma20 = sum(closes) / 20
        
        prev_closes = [k["close"] for k in klines[-21:-1]]
        prev_ma5 = sum(prev_closes[-5:]) / 5
        prev_ma20 = sum(prev_closes) / 20
        
        # 金叉
        if prev_ma5 <= prev_ma20 and ma5 > ma20:
            return 1
        # 死叉
        elif prev_ma5 >= prev_ma20 and ma5 < ma20:
            return -1
        
        return 0
    
    # 运行回测(2024年全年)
    end_time = int(datetime(2024, 12, 31).timestamp() * 1000)
    report = backtester.run_backtest(
        symbol="AAPL.US",
        start_time=int(datetime(2024, 1, 1).timestamp() * 1000),
        end_time=end_time,
        strategy_fn=simple_ma_strategy,
        interval="1d"
    )
    
    print("\n=== 回测报告 ===")
    print(f"总交易次数: {report['total_trades']}")
    print(f"盈利交易: {report['winning_trades']}")
    print(f"亏损交易: {report['losing_trades']}")
    print(f"平均滑点: {report['avg_slippage_bps']:.2f} 基点")
    print(f"总滑点成本: ${report['total_slippage_cost']:.2f}")
    print(f"最终资金: ${report['final_capital']:.2f}")

⚠️ 工程预警:上述回测引擎简化了 ADV 估算和信号生成逻辑,仅用于演示滑点模拟的实现思路。生产级回测引擎需要更完善的资金管理模块、订单管理模块和风险管理模块。


八、结语:回测不是模拟,是校准

滑点模拟不是给回测引擎打补丁,而是一种重新校准策略的过程

当你开始认真模拟滑点时,你可能会发现:

  • 你的“圣杯策略”实际上只是扣除了交易成本后勉强跑赢基准
  • 你的高频剥头皮策略在扣除滑点后根本无利可图
  • 你以为的高夏普比率,实际上是高估了流动性、忽略了真实冲击成本

滑点是市场给你的隐性税。只有当你开始精确地测量它、理解它、回测它,你的策略才真正从“纸上谈兵”走向“实战准备”。


下一步行动

如果你希望亲手实现本文的滑点模拟系统

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 使用上文代码连接 TickDB,订阅 depth 频道获取实时订单簿数据
  4. 将订单簿数据接入你的回测引擎,启用本文的滑点估算模型

如果你需要 10 年级别的美股历史 K 线数据用于策略回测校准,联系 [email protected] 了解机构版方案。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可以更快速地接入 TickDB 数据并运行本文的代码示例。


风险提示:本文不构成任何投资建议。回测结果不能保证未来收益,实际交易中的滑点受市场状态、订单规模、流动性等多重因素影响,存在不确定性。