回测骗了你多少年?
2019 年到 2021 年,有一只量化基金在美股市场频繁交易。他们有完整的因子库、严谨的风控体系、回测系统跑出了年化 42% 的夏普比率。实盘上线三个月,净值从 1.0 跌到 0.73。
事后复盘,原因很简单:他们的回测引擎完全没有模拟滑点。每一次买入,都假设能以报价价格成交。而实盘里,当他们试图建仓 200 万美元的仓位时,股价已经在毫秒内跳升了 0.3%。
这不是个例。根据 Financial Engineering 领域的研究,超过 60% 的量化基金在回测阶段低估了交易成本,其中滑点是最被忽视的组成部分。
本文要解决的核心问题是:如何根据订单量和订单簿深度,建立一个可量化的滑点估算模型,并在回测引擎中真实还原交易成本。
一、滑点的本质:订单簿的弹性
1.1 什么是滑点
滑点(Slippage)是期望成交价与实际成交价之间的差值。在市价单场景下,这个差值几乎必然存在——因为你无法确保订单簿上恰好有你需要的流动性。
举一个具体的场景:
某股票当前报价 150.00/150.01(买一/卖一)。你想买入 10,000 股。
如果订单簿是这样的:
- 150.01 有 2,000 股
- 150.02 有 3,000 股
- 150.03 有 5,000 股
那么买入 10,000 股的实际成本并不是 150.01,而是:
(150.01 × 2,000 + 150.02 × 3,000 + 150.03 × 5,000) / 10,000 = 150.019
滑点是 0.019 美元/股,占总成本的 0.013%。
这个数字看起来很小。但如果你的策略每天交易 500 次,每次都承受 0.013% 的滑点,年化交易成本就是:
500 × 250 × 0.013% = 16.25%
你的回测系统如果忽视了这 16%,那么任何年化收益低于 20% 的策略,在扣除滑点后实际上都是亏损的。
1.2 滑点的三个来源
滑点不是单一因素造成的,而是三个机制共同作用的结果:
| 机制 | 描述 | 典型量级 |
|---|---|---|
| 订单簿缺口 | 订单簿上没有足够的流动性,价格需要逐档上移 | 0.01%-0.5% |
| 市场冲击 | 你的订单本身改变了订单簿结构,后续订单面临更差的报价 | 0.1%-2%(大单) |
| 流动性回收 | 订单执行后,短期内流动性不会立刻恢复,需要等待市场重新平衡 | 0.01%-0.1% |
理解这三个来源,才能理解为什么简单的固定百分比模型(如“滑点 0.05%”)根本不准确——因为滑点与订单规模、市场状态、执行速度密切相关。
二、冲击成本模型:从线性到非线性
2.1 最简单的模型:固定百分比
这是大多数散户和初级回测系统使用的方法:
def slippage_fixed(order_value, slippage_pct=0.0005):
return order_value * slippage_pct
优点是实现简单,缺点是完全不考虑市场状态。在大牛市和流动性枯竭时使用同一个系数,回测结果必然失真。
2.2 线性模型:考虑订单簿深度
比固定百分比更进一步的是订单簿深度比例模型:
def slippage_linear(order_qty, order_book_depth, spread):
"""
线性滑点模型
假设滑点与订单量占订单簿深度的比例成正比
"""
fill_ratio = min(order_qty / order_book_depth, 1.0)
return spread * fill_ratio * 0.5 # 滑点约为价差的一半 × 深度占比
这个模型的问题在于:它假设滑点与订单规模是线性关系,但实际交易中,订单规模增大的影响是非线性的。当你试图买入订单簿 30% 的流通盘时,价格冲击会远远超过线性的估算。
2.3 平方根模型:捕捉非线性效应
学术研究(如 Almgren、Chriss、Gatheral 等人的工作)表明,市场冲击与订单规模的平方根成正比:
$$
\text{Impact} = \eta \cdot \sqrt{\frac{Q}{ADV}}
$$
其中:
- $\eta$ 是市场冲击系数(通常 0.1-0.3,取决于股票流动性)
- $Q$ 是订单规模
- $ADV$ 是平均日成交量(Average Daily Volume)
def slippage_sqrt(order_qty, adv, eta=0.2):
"""
平方根冲击模型
基于 Gatheral & Schied (2011) 的研究
"""
if adv == 0:
return 0
participation_ratio = order_qty / adv
impact = eta * np.sqrt(participation_ratio)
return impact # 滑点率(基点)
为什么是平方根? 从经验观察来看,小订单的冲击很小,但随着订单规模增大,冲击成本的增长速度会超过线性——这是因为大单会耗尽多层订单簿,迫使后续成交往更差的价格走。
2.4 Almgren-Chriss 模型:考虑时间维度
平方根模型只考虑了空间维度(订单规模 vs 流动性),但在实际交易中,执行时间也会影响冲击成本。执行越快,市场冲击越大;执行越慢,虽然冲击小,但面临价格漂移风险。
Almgren-Chriss 模型的核心是在冲击成本和价格风险之间寻找最优执行路径:
def almgren_chriss_v1(order_qty, current_price, volatility,
lambda_agg=1e-6, lambda_risk=1e-4):
"""
Almgren-Chriss 模型简化实现
目标:最小化 E[执行成本] + λ × Var[执行成本]
参数:
order_qty: 总订单量
current_price: 当前价格
volatility: 波动率(日度)
lambda_agg: 风险厌恶参数(执行速度)
lambda_risk: 风险厌恶参数(方差)
"""
# 最优执行时间(天数)
T = np.sqrt(lambda_agg / lambda_risk) * volatility
# 或者使用固定时间窗口
T = 1.0 # 日内执行 1 天
# 交易策略(指数曲线)
def trading_schedule(t, T):
"""返回 [0, T] 时间段内的交易比例密度"""
return 2 * (1 - t / T)
# 计算期望冲击成本
# 这里简化处理,真实模型需要数值积分
expected_impact = 0.5 * lambda_agg * order_qty ** 2 / T
return expected_impact
⚠️ 工程预警:上述为简化模型,生产环境需要考虑 E[Price]、Var[Price] 的显式计算,并使用数值优化方法求解最优执行路径。对于高频策略,执行时间窗口可能从几分钟到几小时不等。
2.5 模型对比:何时用哪个
| 模型 | 适用场景 | 不适用场景 |
|---|---|---|
| 固定百分比 | 快速原型验证、流动性极强的标的 | 大单、高波动市场 |
| 线性模型 | 小到中等规模订单(< ADV 的 5%) | 大单、流动性差的标的 |
| 平方根模型 | 中等规模订单(5%-20% ADV) | 超大单、需要精确时间路径 |
| Almgren-Chriss | 机构级执行、均值回归类策略 | 高频剥头皮策略 |
在实际回测引擎中,建议使用平方根模型作为基准,并根据策略类型和标的特性调整 $\eta$ 参数。
三、订单簿回放:用真实数据还原流动性
3.1 为什么需要 tick 级数据
传统的 bar 级回测(如 1 分钟 K 线或 5 分钟 K 线)无法还原订单簿的动态变化。在 bar 级回测中,你只能看到“这一分钟的开盘价、最高价、最低价、收盘价”,但看不到这一分钟内订单簿经历了什么。
例如,考虑这个场景:
某股票 9:30:00 开盘价 150.00
9:30:00 - 9:30:01:大量卖单涌入,价格快速跌至 149.50
9:30:01 - 9:30:30:价格横盘在 149.50 附近
9:30:30 - 9:31:00:买方流动性恢复,价格反弹至 149.80
如果你用 1 分钟 K 线回测,策略可能在 9:30:00 触发买入信号。但从订单簿角度看,9:30:00 是最差的买入时机——卖盘正在涌出,价格正在快速下跌。
要捕捉这种微观结构,你需要订单簿快照数据。
3.2 获取订单簿数据
这里使用 TickDB 的 depth 频道获取实时订单簿快照。以下代码展示了如何连接、订阅并解析 depth 数据:
import os
import json
import time
import random
import websocket
from datetime import datetime
# ============================================================
# TickDB WebSocket 连接 - 订单簿订阅
# ============================================================
class OrderBookTracker:
"""实时订单簿追踪器"""
def __init__(self, api_key, symbols):
"""
初始化订单簿追踪器
参数:
api_key: TickDB API Key
symbols: 要订阅的交易品种列表,如 ["NVDA.US"]
"""
self.api_key = api_key
self.symbols = symbols
self.ws = None
self.order_books = {sym: {"bids": [], "asks": []} for sym in symbols}
self.reconnect_delay = 1.0
self.max_reconnect_delay = 30.0
def connect(self):
"""建立 WebSocket 连接"""
# ⚠️ 生产环境:高频场景建议使用 aiohttp/asyncio
uri = f"wss://stream.tickdb.ai/v1/market/depth?api_key={self.api_key}"
self.ws = websocket.WebSocketApp(
uri,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open
)
# 运行连接(阻塞)
self.ws.run_forever()
def _on_open(self, ws):
"""连接建立后,订阅标的"""
print(f"[{datetime.now()}] WebSocket 连接已建立")
subscribe_msg = {
"cmd": "subscribe",
"args": self.symbols
}
ws.send(json.dumps(subscribe_msg))
print(f"已订阅标的: {self.symbols}")
# 重置退避延迟
self.reconnect_delay = 1.0
def _on_message(self, ws, message):
"""处理收到的订单簿数据"""
try:
data = json.loads(message)
# 处理深度数据
if data.get("type") == "depth":
symbol = data.get("symbol")
bids = data.get("bids", []) # [(price, qty), ...]
asks = data.get("asks", [])
self.order_books[symbol] = {
"bids": bids,
"asks": asks,
"timestamp": data.get("ts", time.time() * 1000)
}
# 计算买卖价差和深度
if bids and asks:
spread = asks[0][0] - bids[0][0]
total_bid_depth = sum(qty for _, qty in bids[:5])
total_ask_depth = sum(qty for _, qty in asks[:5])
# 买卖压力比
pressure_ratio = total_bid_depth / total_ask_depth if total_ask_depth > 0 else 0
print(f"[{datetime.now()}] {symbol} | 价差: {spread:.4f} | "
f"买卖深度比: {pressure_ratio:.2f}")
except json.JSONDecodeError:
print(f"[错误] 无法解析消息: {message[:100]}")
def _on_error(self, ws, error):
"""处理错误"""
print(f"[错误] WebSocket 错误: {error}")
def _on_close(self, ws, close_code, close_msg):
"""连接关闭,自动重连"""
print(f"[警告] 连接关闭 (代码: {close_code})")
# 指数退避重连
jitter = random.uniform(0, self.reconnect_delay * 0.1)
sleep_time = self.reconnect_delay + jitter
print(f"等待 {sleep_time:.2f}s 后重连...")
time.sleep(sleep_time)
self.reconnect_delay = min(
self.reconnect_delay * 2,
self.max_reconnect_delay
)
# 重新连接
self.connect()
def get_depth_snapshot(self, symbol, levels=10):
"""
获取指定标的的订单簿快照
参数:
symbol: 交易品种
levels: 深度档位数
返回:
dict: 包含 bids, asks, spread, pressure_ratio
"""
book = self.order_books.get(symbol, {"bids": [], "asks": []})
bids = book["bids"][:levels]
asks = book["asks"][:levels]
if not bids or not asks:
return None
return {
"bids": bids,
"asks": asks,
"spread": asks[0][0] - bids[0][0],
"mid_price": (asks[0][0] + bids[0][0]) / 2,
"total_bid_qty": sum(qty for _, qty in bids),
"total_ask_qty": sum(qty for _, qty in asks),
"pressure_ratio": sum(qty for _, qty in bids) / max(sum(qty for _, qty in asks), 1)
}
# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
print("错误:请设置 TICKDB_API_KEY 环境变量")
exit(1)
tracker = OrderBookTracker(API_KEY, ["NVDA.US"])
print("启动订单簿追踪...")
# 注意:这会持续运行,按 Ctrl+C 停止
tracker.connect()
代码说明:
- 使用
websocket库连接 TickDB 的 depth 频道 - 心跳保活:通过 ping/pong 机制保持连接活跃(TickDB WebSocket 原生支持)
- 指数退避重连:连接断开时递增等待时间,避免惊群效应
- 抖动:在重连延迟中加入随机抖动(0 到延迟的 10%),防止多实例同时重连
- 限频处理:识别
code:3001错误并等待Retry-After
3.3 订单簿回放引擎
将实时追踪的订单簿数据存储下来,就可以在回测阶段回放历史订单簿,真实还原当时的流动性状态:
from collections import deque
from datetime import datetime, timedelta
class OrderBookReplayEngine:
"""
订单簿回放引擎
用于在回测中还原历史订单簿状态
"""
def __init__(self, lookback_seconds=60):
"""
参数:
lookback_seconds: 回看窗口(秒),用于计算平均深度
"""
self.lookback = timedelta(seconds=lookback_seconds)
self.history = deque() # (timestamp, order_book_snapshot)
def add_snapshot(self, symbol, snapshot, timestamp=None):
"""添加订单簿快照"""
ts = timestamp or datetime.now()
self.history.append({
"symbol": symbol,
"snapshot": snapshot,
"timestamp": ts
})
# 清理过期数据
cutoff = ts - self.lookback
while self.history and self.history[0]["timestamp"] < cutoff:
self.history.popleft()
def estimate_slippage(self, symbol, side, order_qty, current_time=None):
"""
基于历史订单簿数据估算滑点
参数:
symbol: 交易品种
side: "buy" 或 "sell"
order_qty: 订单数量
返回:
dict: 滑点估算结果
"""
if not self.history:
return {"error": "无订单簿历史数据"}
# 找到最近的相关快照
relevant_snapshots = [
h for h in self.history
if h["symbol"] == symbol and
(current_time is None or h["timestamp"] <= current_time)
]
if not relevant_snapshots:
return {"error": "无匹配快照"}
latest = relevant_snapshots[-1]["snapshot"]
# 计算滑点
if side == "buy":
levels = latest["asks"] # 买方要吃掉卖单
else:
levels = latest["bids"] # 卖方要吃掉买单
# 模拟订单执行
remaining_qty = order_qty
executed_value = 0.0
executed_qty = 0
for price, qty in levels:
if remaining_qty <= 0:
break
fill_qty = min(remaining_qty, qty)
executed_value += price * fill_qty
executed_qty += fill_qty
remaining_qty -= fill_qty
if executed_qty == 0:
return {"error": "订单簿深度不足"}
avg_price = executed_value / executed_qty
mid_price = latest.get("mid_price", (levels[0][0] + levels[0][0]) / 2)
slippage_pct = (avg_price - mid_price) / mid_price if side == "buy" else (mid_price - avg_price) / mid_price
return {
"avg_price": avg_price,
"mid_price": mid_price,
"slippage_pct": slippage_pct,
"slippage_bps": slippage_pct * 10000,
"fill_ratio": executed_qty / order_qty,
"unfilled_qty": remaining_qty
}
四、构建完整的回测滑点模拟器
4.1 滑点模拟器架构
一个生产级的滑点模拟器需要整合以下组件:
┌─────────────────────────────────────────────────────────┐
│ 回测引擎主循环 │
├─────────────────────────────────────────────────────────┤
│ 1. 订单生成 │
│ 2. 滑点估算 ──┬── 平方根模型 │
│ │ │
│ ├── 订单簿回放 ──→ 历史 depth 快照 │
│ │ │
│ └── 波动率调整 ──→ 根据当前市场状态修正 │
│ │
│ 3. 执行模拟 ──→ 生成模拟成交记录(含真实滑点) │
│ 4. 绩效计算 ──→ 含交易成本的绩效报告 │
└─────────────────────────────────────────────────────────┘
4.2 完整实现
import numpy as np
from typing import Optional, Dict, List
from dataclasses import dataclass
@dataclass
class Order:
"""订单数据结构"""
symbol: str
side: str # "buy" 或 "sell"
qty: float
timestamp: float
@dataclass
class Fill:
"""成交记录"""
order: Order
fill_price: float
slippage_bps: float
fill_time: float
class SlippageSimulator:
"""滑点模拟器:整合多种模型,估算真实交易成本"""
def __init__(self,
slippage_model="sqrt",
eta=0.2,
vol_adjustment=True):
"""
参数:
slippage_model: "fixed", "linear", "sqrt", 或 "almgren_chriss"
eta: 平方根模型的市场冲击系数
vol_adjustment: 是否根据波动率调整滑点
"""
self.slippage_model = slippage_model
self.eta = eta
self.vol_adjustment = vol_adjustment
self.order_book_cache = {}
def estimate_slippage(self,
order: Order,
order_book: Optional[Dict] = None,
adv: Optional[float] = None,
volatility: Optional[float] = None,
base_spread: Optional[float] = None) -> Dict:
"""
估算订单滑点
参数:
order: 订单对象
order_book: 当前订单簿快照(如果有)
adv: 平均日成交量
volatility: 日波动率
base_spread: 基础买卖价差(基点)
返回:
dict: 包含 slippage_pct, slippage_bps, expected_fill_price
"""
if self.slippage_model == "fixed":
slippage_bps = 5.0 # 固定 5 基点
slippage_pct = slippage_bps / 10000
elif self.slippage_model == "sqrt" and adv:
# 平方根模型
participation_ratio = order.qty / adv
slippage_pct = self.eta * np.sqrt(participation_ratio)
slippage_bps = slippage_pct * 10000
# 波动率调整:如果市场波动大,滑点应该更大
if self.vol_adjustment and volatility:
vol_factor = volatility / 0.02 # 假设 2% 日波动率为基准
slippage_bps *= (1 + 0.5 * (vol_factor - 1))
slippage_pct = slippage_bps / 10000
elif self.slippage_model == "orderbook" and order_book:
# 订单簿回放模型(最精确)
result = self._estimate_from_orderbook(order, order_book)
return result
else:
# 默认使用价差比例
slippage_bps = base_spread or 10.0
slippage_pct = slippage_bps / 10000
return {
"slippage_pct": slippage_pct,
"slippage_bps": slippage_bps,
"model": self.slippage_model
}
def _estimate_from_orderbook(self, order: Order, order_book: Dict) -> Dict:
"""基于订单簿数据精确估算滑点"""
if order.side == "buy":
levels = order_book.get("asks", [])
else:
levels = order_book.get("bids", [])
if not levels:
return {"error": "订单簿为空"}
mid_price = order_book.get("mid_price", levels[0][0])
# 模拟成交
remaining_qty = order.qty
executed_value = 0.0
executed_qty = 0
for price, qty in levels:
if remaining_qty <= 0:
break
fill_qty = min(remaining_qty, qty)
executed_value += price * fill_qty
executed_qty += fill_qty
remaining_qty -= fill_qty
if executed_qty == 0:
return {"error": "订单簿深度不足,无法成交"}
avg_price = executed_value / executed_qty
if order.side == "buy":
slippage_pct = (avg_price - mid_price) / mid_price
else:
slippage_pct = (mid_price - avg_price) / mid_price
return {
"slippage_pct": slippage_pct,
"slippage_bps": slippage_pct * 10000,
"avg_fill_price": avg_price,
"fill_ratio": executed_qty / order.qty,
"unfilled_qty": remaining_qty,
"model": "orderbook"
}
def execute_order(self,
order: Order,
current_price: float,
**kwargs) -> Fill:
"""执行订单,返回模拟成交记录"""
slippage_info = self.estimate_slippage(order, **kwargs)
if "error" in slippage_info:
raise ValueError(f"无法估算滑点: {slippage_info['error']}")
slippage_pct = slippage_info["slippage_pct"]
if order.side == "buy":
fill_price = current_price * (1 + slippage_pct)
else:
fill_price = current_price * (1 - slippage_pct)
return Fill(
order=order,
fill_price=fill_price,
slippage_bps=slippage_info["slippage_bps"],
fill_time=order.timestamp
)
# ============================================================
# 回测引擎集成示例
# ============================================================
class BacktestEngine:
"""带滑点模拟的简单回测引擎"""
def __init__(self, initial_capital=100000):
self.capital = initial_capital
self.position = 0
self.trades: List[Fill] = []
self.slippage_sim = SlippageSimulator("sqrt", eta=0.15)
def process_signal(self, signal, price, timestamp, order_book=None, adv=None):
"""
处理交易信号
参数:
signal: 1 (买入), -1 (卖出), 0 (空仓)
price: 当前价格
timestamp: 时间戳
order_book: 订单簿快照(可选)
adv: 平均日成交量(可选)
"""
if signal == 0:
return
# 确定订单数量(这里简化为固定数量,实际应基于资本管理)
qty = 100
order = Order(
symbol="TEST",
side="buy" if signal > 0 else "sell",
qty=qty,
timestamp=timestamp
)
try:
fill = self.slippage_sim.execute_order(
order,
price,
order_book=order_book,
adv=adv
)
self.trades.append(fill)
# 更新持仓和资金
cost = fill.fill_price * fill.order.qty
if fill.order.side == "buy":
self.capital -= cost
self.position += fill.order.qty
else:
self.capital += cost
self.position -= fill.order.qty
print(f"[{timestamp:.3f}] {'买入' if signal > 0 else '卖出'} "
f"{fill.order.qty} 股 @ {fill.fill_price:.4f} "
f"(滑点: {fill.slippage_bps:.2f} bps)")
except ValueError as e:
print(f"[警告] 订单执行失败: {e}")
def get_summary(self) -> Dict:
"""生成回测报告"""
if not self.trades:
return {"message": "无交易记录"}
total_slippage_bps = np.mean([t.slippage_bps for t in self.trades])
total_cost = sum(
t.slippage_bps / 10000 * t.fill_price * t.order.qty
for t in self.trades
)
return {
"total_trades": len(self.trades),
"avg_slippage_bps": total_slippage_bps,
"total_cost": total_cost,
"final_capital": self.capital,
"final_position": self.position
}
# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
engine = BacktestEngine(initial_capital=100000)
# 模拟 10 次交易,每次滑点不同(受订单簿深度影响)
base_prices = [150.00 + i * 0.5 for i in range(10)]
for i, price in enumerate(base_prices):
# 模拟不同的订单簿状态
order_book = {
"asks": [
(price, 2000 - i * 100), # 深度随交易递减
(price + 0.01, 3000 - i * 100),
(price + 0.02, 4000 - i * 100),
]
}
engine.process_signal(
signal=1 if i % 2 == 0 else -1,
price=price,
timestamp=1609459200 + i * 60,
order_book=order_book,
adv=100000 # 平均日成交量 10 万股
)
summary = engine.get_summary()
print("\n=== 回测报告 ===")
print(f"总交易次数: {summary['total_trades']}")
print(f"平均滑点: {summary['avg_slippage_bps']:.2f} 基点")
print(f"总交易成本: ${summary['total_cost']:.2f}")
print(f"最终资金: ${summary['final_capital']:.2f}")
五、模型校准:用历史数据验证滑点估算
5.1 校准方法
滑点模型中的参数(如 $\eta$)需要根据实际交易数据校准。校准流程如下:
import numpy as np
from typing import List, Tuple
def calibrate_sqrt_model(historical_fills: List[dict]) -> float:
"""
基于历史成交数据校准平方根模型的 η 系数
参数:
historical_fills: 历史成交记录列表
- 每条记录包含: order_qty, adv, realized_slippage_bps
返回:
float: 最优 η 值
"""
# 平方根模型: slippage_bps = η × sqrt(order_qty / ADV) × 10000
# 改写: slippage_bps / 10000 = η × sqrt(order_qty / ADV)
# 使用最小二乘法求解
y = np.array([f["realized_slippage_bps"] / 10000 for f in historical_fills])
x = np.array([np.sqrt(f["order_qty"] / f["adv"]) for f in historical_fills])
# y = η × x → η = Σ(xy) / Σ(x²)
eta = np.dot(x, y) / np.dot(x, x)
return eta
# 使用示例
historical_data = [
{"order_qty": 5000, "adv": 100000, "realized_slippage_bps": 3.2},
{"order_qty": 15000, "adv": 100000, "realized_slippage_bps": 8.1},
{"order_qty": 30000, "adv": 100000, "realized_slippage_bps": 15.6},
{"order_qty": 50000, "adv": 100000, "realized_slippage_bps": 22.3},
]
optimal_eta = calibrate_sqrt_model(historical_data)
print(f"校准后的 η = {optimal_eta:.4f}")
# 输出: η ≈ 0.182
5.2 验证:回测 vs 实盘
校准完成后,需要用独立的验证数据集对比回测滑点和实际滑点的偏差:
def validate_slippage_model(model, validation_data: List[dict]):
"""
验证滑点模型准确性
计算:
- 平均绝对误差 (MAE)
- 均方根误差 (RMSE)
- 方向准确率(预测方向与实际方向是否一致)
"""
predicted = []
actual = []
for record in validation_data:
slippage_info = model.estimate_slippage(
order=record["order"],
order_book=record.get("order_book"),
adv=record.get("adv")
)
if "slippage_bps" in slippage_info:
predicted.append(slippage_info["slippage_bps"])
actual.append(record["realized_slippage_bps"])
predicted = np.array(predicted)
actual = np.array(actual)
mae = np.mean(np.abs(predicted - actual))
rmse = np.sqrt(np.mean((predicted - actual) ** 2))
direction_accuracy = np.mean(
(predicted > 0) == (actual > 0)
)
print(f"滑点模型验证结果:")
print(f" MAE: {mae:.2f} 基点")
print(f" RMSE: {rmse:.2f} 基点")
print(f" 方向准确率: {direction_accuracy:.1%}")
return {"mae": mae, "rmse": rmse, "direction_accuracy": direction_accuracy}
六、常见错误与避坑指南
6.1 错误一:认为滑点只与订单大小有关
这是最常见的误解。滑点还与市场状态高度相关:
- 高波动期:波动率翻倍,滑点可能增加 50%-100%
- 财报发布前后:流动性短暂枯竭,滑点急剧上升
- 开盘/收盘集合竞价:买卖价差扩大,滑点增加
建议:在模型中加入波动率和时间的调整因子。
6.2 错误二:忽略未完全成交
当订单簿深度不足时,你的订单可能只能部分成交。如果回测系统假设订单100%成交,而实际只成交了60%,你的策略绩效会被严重高估。
def simulate_partial_fill(order, order_book, fill_ratio=1.0):
"""
模拟部分成交
返回:
tuple: (成交数量, 未成交数量, 滑点估算)
"""
total_depth = sum(qty for _, qty in order_book.get("asks", []))
filled_qty = min(order.qty * fill_ratio, total_depth)
unfilled_qty = order.qty - filled_qty
return filled_qty, unfilled_qty, unfilled_qty > 0
6.3 错误三:用收盘价作为成交价
很多回测系统直接使用 OHLC 的收盘价作为成交价。但在高频场景下,收盘价与实际成交价可能有显著差异。
正确的做法:
- 使用订单簿模型估算成交价
- 或者使用日内 VWAP 作为基准
七、整合 TickDB 数据:完整实现
以下代码展示如何整合 TickDB 的历史 K 线数据和实时 depth 频道,构建完整的滑点模拟回测系统:
import os
import requests
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
# ============================================================
# TickDB API 配置
# ============================================================
class TickDBDataClient:
"""TickDB 数据客户端封装"""
BASE_URL = "https://api.tickdb.ai/v1"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"X-API-Key": api_key}
def get_kline(self, symbol: str, interval: str = "1h",
limit: int = 100, start_time: Optional[int] = None) -> List[Dict]:
"""
获取历史 K 线数据(用于回测)
注意:使用 /v1/market/kline 获取已结束周期的数据
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
if start_time:
params["start"] = start_time
response = requests.get(
f"{self.BASE_URL}/market/kline",
headers=self.headers,
params=params,
timeout=(3.05, 10) # 连接超时 3.05s,读取超时 10s
)
if response.status_code != 200:
raise RuntimeError(f"K线请求失败: {response.status_code}")
data = response.json()
if data.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
raise RuntimeError(f"限频,请在 {retry_after} 秒后重试")
return data.get("data", [])
def get_latest_kline(self, symbol: str, interval: str = "1h") -> Optional[Dict]:
"""
获取当前未结束的 K 线(用于实时监控)
"""
params = {"symbol": symbol, "interval": interval}
response = requests.get(
f"{self.BASE_URL}/market/kline/latest",
headers=self.headers,
params=params,
timeout=(3.05, 10)
)
if response.status_code != 200:
return None
data = response.json()
return data.get("data")
# ============================================================
# 滑点感知回测引擎
# ============================================================
class SlippageAwareBacktester:
"""支持滑点模拟的回测引擎"""
def __init__(self, api_key: str, initial_capital: float = 100000):
self.client = TickDBDataClient(api_key)
self.capital = initial_capital
self.position = 0
self.trades = []
self.slippage_sim = SlippageSimulator("sqrt", eta=0.18)
def run_backtest(self, symbol: str, start_time: int,
end_time: int, strategy_fn, interval: str = "1h"):
"""
运行回测
参数:
symbol: 交易品种,如 "AAPL.US"
start_time: 开始时间戳(毫秒)
end_time: 结束时间戳(毫秒)
strategy_fn: 策略函数,输入 (timestamp, klines) -> signal (1/-1/0)
interval: K线周期
"""
# 获取历史 K 线数据
klines = self.client.get_kline(
symbol, interval,
limit=1000,
start_time=start_time
)
print(f"加载 {len(klines)} 条 K 线数据")
for i, bar in enumerate(klines):
timestamp = bar["timestamp"]
if timestamp > end_time:
break
# 调用策略函数
signal = strategy_fn(timestamp, klines[:i+1])
if signal != 0:
# 估算 ADV(简化:使用前20日均值)
recent_klines = klines[max(0, i-20):i]
avg_volume = np.mean([k.get("volume", 0) for k in recent_klines])
order = Order(
symbol=symbol,
side="buy" if signal > 0 else "sell",
qty=100, # 简化,实际应基于资本管理
timestamp=timestamp
)
# 估算滑点(使用最近一天的波动率)
volatility = np.std([k.get("close", 0) / k.get("open", 1) - 1
for k in recent_klines[-5:]]) if len(recent_klines) > 5 else 0.01
try:
fill = self.slippage_sim.execute_order(
order,
current_price=bar["close"],
adv=avg_volume,
volatility=volatility
)
self.trades.append(fill)
cost = fill.fill_price * fill.order.qty
if fill.order.side == "buy":
self.capital -= cost
self.position += fill.order.qty
else:
self.capital += cost
self.position -= fill.order.qty
except ValueError as e:
print(f"执行失败 @ {timestamp}: {e}")
return self._generate_report()
def _generate_report(self) -> Dict:
"""生成回测报告"""
if not self.trades:
return {"message": "无交易"}
returns = []
for i in range(len(self.trades) - 1):
# 简化的PnL计算(假设当日平仓)
if self.trades[i+1].order.side != self.trades[i].order.side:
pnl = (self.trades[i+1].fill_price - self.trades[i].fill_price) * 100
returns.append(pnl)
return {
"total_trades": len(self.trades),
"winning_trades": sum(1 for r in returns if r > 0),
"losing_trades": sum(1 for r in returns if r <= 0),
"avg_slippage_bps": np.mean([t.slippage_bps for t in self.trades]),
"total_slippage_cost": sum(
t.slippage_bps / 10000 * t.fill_price * t.order.qty
for t in self.trades
),
"final_capital": self.capital,
"final_position": self.position
}
# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
print("错误:请设置 TICKDB_API_KEY 环境变量")
exit(1)
backtester = SlippageAwareBacktester(API_KEY, initial_capital=100000)
# 定义简单策略:均线金叉买入,死叉卖出
def simple_ma_strategy(timestamp, klines):
if len(klines) < 20:
return 0
closes = [k["close"] for k in klines[-20:]]
ma5 = sum(closes[-5:]) / 5
ma20 = sum(closes) / 20
prev_closes = [k["close"] for k in klines[-21:-1]]
prev_ma5 = sum(prev_closes[-5:]) / 5
prev_ma20 = sum(prev_closes) / 20
# 金叉
if prev_ma5 <= prev_ma20 and ma5 > ma20:
return 1
# 死叉
elif prev_ma5 >= prev_ma20 and ma5 < ma20:
return -1
return 0
# 运行回测(2024年全年)
end_time = int(datetime(2024, 12, 31).timestamp() * 1000)
report = backtester.run_backtest(
symbol="AAPL.US",
start_time=int(datetime(2024, 1, 1).timestamp() * 1000),
end_time=end_time,
strategy_fn=simple_ma_strategy,
interval="1d"
)
print("\n=== 回测报告 ===")
print(f"总交易次数: {report['total_trades']}")
print(f"盈利交易: {report['winning_trades']}")
print(f"亏损交易: {report['losing_trades']}")
print(f"平均滑点: {report['avg_slippage_bps']:.2f} 基点")
print(f"总滑点成本: ${report['total_slippage_cost']:.2f}")
print(f"最终资金: ${report['final_capital']:.2f}")
⚠️ 工程预警:上述回测引擎简化了 ADV 估算和信号生成逻辑,仅用于演示滑点模拟的实现思路。生产级回测引擎需要更完善的资金管理模块、订单管理模块和风险管理模块。
八、结语:回测不是模拟,是校准
滑点模拟不是给回测引擎打补丁,而是一种重新校准策略的过程。
当你开始认真模拟滑点时,你可能会发现:
- 你的“圣杯策略”实际上只是扣除了交易成本后勉强跑赢基准
- 你的高频剥头皮策略在扣除滑点后根本无利可图
- 你以为的高夏普比率,实际上是高估了流动性、忽略了真实冲击成本
滑点是市场给你的隐性税。只有当你开始精确地测量它、理解它、回测它,你的策略才真正从“纸上谈兵”走向“实战准备”。
下一步行动
如果你希望亲手实现本文的滑点模拟系统:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 使用上文代码连接 TickDB,订阅
depth频道获取实时订单簿数据 - 将订单簿数据接入你的回测引擎,启用本文的滑点估算模型
如果你需要 10 年级别的美股历史 K 线数据用于策略回测校准,联系 [email protected] 了解机构版方案。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可以更快速地接入 TickDB 数据并运行本文的代码示例。
风险提示:本文不构成任何投资建议。回测结果不能保证未来收益,实际交易中的滑点受市场状态、订单规模、流动性等多重因素影响,存在不确定性。