滑点与冲击成本模拟:让回测更接近实盘

"回测时我是天才,实盘时我是韭菜。"

这句话在量化圈流传甚广,不是因为交易者谦虚,而是因为回测与实盘之间存在一道隐蔽却致命的鸿沟——滑点与冲击成本。你的策略在历史数据上每一次"假设成交",都隐含着一个不真实的假设:你的订单总能以你想要的价格立即成交。

这是一个危险的假设。

当资金量超过一定规模,当市场流动性出现波动,这个假设会像纸牌屋一样坍塌。本文拆解滑点的本质——冲击成本的量化模型,以及如何用订单簿回放和 tick 级回测让回测结果更贴近真实交易环境。


一、为什么你的回测在骗你

1.1 三个经典的回测幻觉

在展开技术细节之前,先厘清回测中三种最常见的"幻觉":

幻觉一:无限流动性幻觉。 大多数回测框架假设你的订单可以 100% 按市价成交,不需要考虑订单量对市场的影响。1,000 股可以,100,000 股也可以。这在实盘中是灾难。

幻觉二:零滑点幻觉。 回测价格直接取自 OHLC 或 VWAP,忽略了你下单瞬间盘口的情况。你以为你在 150.00 买,实际因为你的买盘消耗了卖一、卖二、卖三,价格已经移动到 150.05。

幻觉三:完美信息幻觉。 你的策略基于收盘价或下一秒价格做决策,但实盘中你看到的价格有延迟,你的订单有执行延迟,信息在传递过程中已经发生变化。

这三种幻觉叠加,让回测收益往往高出实盘 20%-50%,甚至更多。

1.2 滑点的本质:供需失衡的价格惩罚

滑点不是"倒霉",它是市场微观结构的必然产物。

当你下一笔买入订单,你消耗的是卖方提供的流动性。如果你的订单量超过当前卖一挂单量,你需要"吃掉"卖二、卖三……每一档的价格都更差。这部分额外的成本,就是冲击成本(Market Impact)

数学上:

$$
\text{Slippage} = P_{\text{actual}} - P_{\text{decision}}
$$

其中 $P_{\text{actual}}$ 是你的实际成交均价,$P_{\text{decision}}$ 是你决策时参考的价格。

冲击成本与订单量的关系不是线性的。当订单量超过订单簿某一深度的 10%,冲击成本开始显著上升;超过 30%,市场开始感知到你的存在,流动性提供商会调整报价。


二、冲击成本模型:从理论到量化

2.1 经典模型:Almgren-Chriss

金融工程领域最经典的冲击成本模型是 Almgren-Chriss 模型(2000)。它将执行成本分解为两部分:

  1. 永久冲击(Permanent Impact):你的交易行为改变了股票的均衡价格,后续交易都要承受这部分成本。
  2. 临时冲击(Temporary Impact):你的交易只影响当前订单簿,消耗的是流动性而非改变价格预期。

总执行成本:

$$
C = \eta \cdot X + \gamma \cdot \sigma \cdot \sqrt{\frac{X}{V}}
$$

其中:

  • $X$:总订单量
  • $V$:日均成交量(ADV)
  • $\sigma$:波动率
  • $\eta$:永久冲击系数
  • $\gamma$:临时冲击系数

当 $X/V$ 越大(即你的订单占日均成交量的比例越高),单位冲击成本越高。这是大资金必须分批建仓的核心原因。

2.2 简化模型:订单簿深度估算

在工程实践中,大多数量化开发者不需要完整的 Almgren-Chriss 框架。一个更实用的模型是基于订单簿深度的线性冲击估算

$$
\text{Slippage Rate} = \alpha \cdot \frac{Q}{D_{\text{level}}} + \beta
$$

其中:

  • $Q$:你的订单量
  • $D_{\text{level}}$:订单簿某一档的累计深度
  • $\alpha$:冲击系数(通常取 0.1-0.3,取决于标的的流动性)
  • $\beta$:基础交易成本(佣金 + spread 的一半)

示例:订单簿与滑点的关系

订单量 订单簿前 10 档总深度 占比 估算滑点
1,000 股 50,000 股 2% 0.02%
5,000 股 50,000 股 10% 0.10%
10,000 股 50,000 股 20% 0.22%
25,000 股 50,000 股 50% 0.65%

可以看到,滑点随订单量非线性增长。1,000 股时几乎可以忽略,25,000 股时滑点已经是策略基准差的 0.65%——这对于一个年化收益 5% 的策略,意味着实盘收益腰斩。

2.3 冲击系数校准

冲击系数 $\alpha$ 不是固定的,它随市场状态变化:

市场状态 冲击系数 $\alpha$ 说明
正常交易时段 0.10 - 0.15 流动性充足
财报前 30 分钟 0.20 - 0.25 观望情绪浓厚
财报后 1 小时 0.30 - 0.50 波动率急剧上升
流动性危机 0.50+ 订单簿稀疏

这意味着回测时使用固定的滑点假设是不准确的。正确的做法是根据市场状态动态调整冲击系数,这需要订单簿级别的历史数据。


三、订单簿回放:重建历史微观结构

3.1 什么是订单簿回放

订单簿回放(Order Book Replay) 是指用历史订单簿数据重建某个时间点的买卖盘结构,然后用你的订单量去"冲击"这个订单簿,计算实际成交价格。

过程:

  1. 获取历史某时刻的 depth 快照
  2. 用你的订单量从卖一档开始消耗
  3. 累加每一档的成交量 × 价格,得到总成本
  4. 计算平均成交价与决策价格的差异

这不是模拟,这是回放——你在用真实发生过的市场状态验证你的订单会产生什么结果。

3.2 深度数据获取

用 TickDB 的 WebSocket 订阅 depth 频道,可以获取实时的订单簿深度:

import os
import json
import time
import random
import websocket
from datetime import datetime

class TickDBDepthSubscriber:
    """
    TickDB depth 频道订阅器
    ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
    """
    
    def __init__(self, api_key: str, symbols: list, on_depth_callback):
        self.api_key = api_key
        self.symbols = symbols
        self.on_depth_callback = on_depth_callback
        self.ws = None
        self.reconnect_delay = 1.0
        self.max_reconnect_delay = 30.0
        self.base_reconnect_delay = 1.0
        
    def connect(self):
        """建立 WebSocket 连接"""
        try:
            url = f"wss://api.tickdb.ai/ws/depth?api_key={self.api_key}"
            self.ws = websocket.WebSocketApp(
                url,
                on_message=self._on_message,
                on_error=self._on_error,
                on_close=self._on_close
            )
            # 订阅指定标的的深度数据
            subscribe_msg = {
                "cmd": "subscribe",
                "args": self.symbols
            }
            self.ws.on_open = lambda ws: ws.send(json.dumps(subscribe_msg))
            self.ws.run_forever(ping_interval=20)
        except Exception as e:
            print(f"[{datetime.now()}] WebSocket 连接失败: {e}")
            self._schedule_reconnect()
            
    def _on_message(self, ws, message):
        """处理深度数据推送"""
        try:
            data = json.loads(message)
            if data.get("cmd") == "pong":
                return  # 心跳响应,忽略
            if data.get("code") == 3001:
                # 限频处理
                retry_after = int(data.get("headers", {}).get("Retry-After", 5))
                print(f"[{datetime.now()}] 限频,等待 {retry_after}s")
                time.sleep(retry_after)
                return
            if "data" in data:
                for symbol, depth_data in data["data"].items():
                    self.on_depth_callback(symbol, depth_data)
        except json.JSONDecodeError:
            pass
            
    def _on_error(self, ws, error):
        print(f"[{datetime.now()}] WebSocket 错误: {error}")
        
    def _on_close(self, ws, close_code, close_msg):
        print(f"[{datetime.now()}] 连接关闭,code={close_code}")
        self._schedule_reconnect()
        
    def _schedule_reconnect(self):
        """指数退避重连 + 抖动"""
        delay = min(
            self.base_reconnect_delay * (2 ** self.reconnect_delay),
            self.max_reconnect_delay
        )
        # 添加抖动避免惊群效应
        jitter = random.uniform(0, delay * 0.1)
        total_delay = delay + jitter
        print(f"[{datetime.now()}] {total_delay:.2f}s 后重连...")
        time.sleep(total_delay)
        self.reconnect_delay += 1
        self.connect()
        
    def _send_ping(self):
        """心跳保活"""
        if self.ws and self.ws.keep_running:
            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
            except Exception as e:
                print(f"[{datetime.now()}] 心跳发送失败: {e}")


# 使用示例
def handle_depth(symbol: str, depth: dict):
    """处理深度数据回调"""
    bids = depth.get("bids", [])  # 买盘 [[price, volume], ...]
    asks = depth.get("asks", [])  # 卖盘 [[price, volume], ...]
    print(f"[{datetime.now()}] {symbol} 深度更新: 买{len(bids)}档/卖{len(asks)}档")
    for level in bids[:5]:
        print(f"  买 {level[0]:.2f} × {level[1]}")


API_KEY = os.environ.get("TICKDB_API_KEY")
subscriber = TickDBDepthSubscriber(
    api_key=API_KEY,
    symbols=["AAPL.US", "TSLA.US"],
    on_depth_callback=handle_depth
)
subscriber.connect()

3.3 历史深度数据回放

实盘监控需要实时数据,但回测需要历史数据。用 TickDB 的 /v1/market/depth 接口可以获取历史某个时间点附近的订单簿快照:

import os
import requests
from datetime import datetime, timezone
from typing import Optional

class TickDBHistoricalDepth:
    """
    获取 TickDB 历史深度快照
    ⚠️ 历史深度数据仅部分市场支持,需确认标的可用性
    """
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.tickdb.ai/v1"
        self.session = requests.Session()
        self.session.headers.update({
            "X-API-Key": self.api_key,
            "Content-Type": "application/json"
        })
        
    def get_historical_depth(
        self, 
        symbol: str, 
        timestamp: int, 
        limit: int = 10
    ) -> Optional[dict]:
        """
        获取指定时间点附近的历史深度快照
        
        Args:
            symbol: 交易品种,如 "BTC.USDT"
            timestamp: Unix 毫秒时间戳
            limit: 返回档位数
            
        Returns:
            订单簿深度数据,包含 bids 和 asks
        """
        try:
            response = self.session.get(
                f"{self.base_url}/market/depth",
                params={
                    "symbol": symbol,
                    "timestamp": timestamp,
                    "limit": limit
                },
                timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
            )
            data = response.json()
            
            if data.get("code") == 0:
                return data.get("data")
            else:
                self._handle_error(data, symbol)
                
        except requests.Timeout:
            print(f"[{datetime.now()}] 请求超时")
            raise
        except requests.RequestException as e:
            print(f"[{datetime.now()}] 请求异常: {e}")
            raise
            
    def _handle_error(self, response: dict, symbol: str = None):
        """统一错误处理"""
        code = response.get("code", 0)
        msg = response.get("message", "未知错误")
        
        error_map = {
            1001: "API Key 无效",
            1002: "API Key 缺失",
            2002: f"交易品种 {symbol} 不存在或暂不支持深度数据",
            3001: "请求频率超限,请稍后重试"
        }
        
        if code in error_map:
            raise ValueError(f"[TickDB Error {code}] {error_map[code]}: {msg}")
        else:
            raise RuntimeError(f"[TickDB Error {code}] {msg}")


# 使用示例
def demo_historical_depth():
    api_key = os.environ.get("TICKDB_API_KEY")
    client = TickDBHistoricalDepth(api_key)
    
    # 获取 2024-02-15 16:00:00 UTC 附近的 BTC 深度快照
    # (用于模拟某次行情波动时的订单簿状态)
    target_time = datetime(2024, 2, 15, 16, 0, 0, tzinfo=timezone.utc)
    timestamp_ms = int(target_time.timestamp() * 1000)
    
    try:
        depth = client.get_historical_depth("BTC.USDT", timestamp_ms, limit=10)
        if depth:
            print(f"时间点: {target_time}")
            print("--- 卖盘 (Asks) ---")
            for price, volume in depth.get("asks", [])[:5]:
                print(f"  {price:.2f} × {volume}")
            print("--- 买盘 (Bids) ---")
            for price, volume in depth.get("bids", [])[:5]:
                print(f"  {price:.2f} × {volume}")
    except ValueError as e:
        print(f"获取失败: {e}")

四、冲击成本计算引擎

4.1 核心算法实现

有了订单簿数据,下一步是计算当你的订单"冲击"这个订单簿时,实际成交价格是多少:

from dataclasses import dataclass
from typing import List, Tuple, Optional

@dataclass
class OrderBookLevel:
    """订单簿档位"""
    price: float
    volume: float
    
@dataclass
class ExecutionResult:
    """执行结果"""
    avg_price: float      # 平均成交价
    slippage_bps: float   # 滑点(基点)
    filled_qty: float     # 成交数量
    remaining_qty: float # 未成交数量
    levels_consumed: int  # 消耗的档位数


class ImpactCostCalculator:
    """
    冲击成本计算引擎
    
    核心算法:从订单簿顶端开始消耗,计算加权平均成交价
    """
    
    def __init__(self, impact_coefficient: float = 0.15):
        """
        Args:
            impact_coefficient: 冲击系数,根据市场状态动态调整
        """
        self.impact_coefficient = impact_coefficient
        
    def calculate_execution(
        self,
        order_side: str,           # "buy" 或 "sell"
        order_qty: float,
        bids: List[OrderBookLevel],
        asks: List[OrderBookLevel],
        decision_price: float = None,
        base_cost_bps: float = 0.5  # 基础成本:佣金 + spread 一半
    ) -> ExecutionResult:
        """
        计算订单执行的实际成本
        
        Args:
            order_side: 交易方向
            order_qty: 订单数量
            bids: 买盘深度 [[price, volume], ...]
            asks: 卖盘深度 [[price, volume], ...]
            decision_price: 决策价格(用于计算滑点)
            base_cost_bps: 基础成本(基点)
            
        Returns:
            ExecutionResult: 包含成交均价、滑点等信息
        """
        if order_side == "buy":
            book = asks  # 买入消耗卖盘
            if decision_price is None:
                decision_price = asks[0].price if asks else 0
        else:
            book = bids  # 卖出消耗买盘
            if decision_price is None:
                decision_price = bids[0].price if bids else 0
        
        remaining_qty = order_qty
        total_cost = 0.0
        filled_qty = 0.0
        levels_consumed = 0
        
        for level in book:
            if remaining_qty <= 0:
                break
                
            consume_qty = min(remaining_qty, level.volume)
            total_cost += consume_qty * level.price
            filled_qty += consume_qty
            remaining_qty -= consume_qty
            levels_consumed += 1
        
        if filled_qty == 0:
            # 无法成交
            return ExecutionResult(
                avg_price=0,
                slippage_bps=float('inf'),
                filled_qty=0,
                remaining_qty=order_qty,
                levels_consumed=0
            )
        
        avg_price = total_cost / filled_qty
        
        # 滑点计算:相对于决策价格的偏差
        slippage_bps = (avg_price - decision_price) / decision_price * 10000
        
        if order_side == "sell":
            slippage_bps = -slippage_bps  # 卖出时价格下跌为负滑点
        
        # 加上基础成本
        slippage_bps += base_cost_bps
        
        return ExecutionResult(
            avg_price=avg_price,
            slippage_bps=slippage_bps,
            filled_qty=filled_qty,
            remaining_qty=remaining_qty,
            levels_consumed=levels_consumed
        )
    
    def calculate_with_market_state_adjustment(
        self,
        order_side: str,
        order_qty: float,
        bids: List[OrderBookLevel],
        asks: List[OrderBookLevel],
        decision_price: float,
        volatility: float = None,
        market_state: str = "normal"
    ) -> ExecutionResult:
        """
        根据市场状态动态调整冲击系数后计算执行成本
        """
        # 动态冲击系数
        state_coefficients = {
            "normal": 0.12,
            "volatile": 0.25,
            "crisis": 0.50,
            "earnings": 0.35  # 财报前后
        }
        
        self.impact_coefficient = state_coefficients.get(market_state, 0.15)
        
        result = self.calculate_execution(
            order_side=order_side,
            order_qty=order_qty,
            bids=bids,
            asks=asks,
            decision_price=decision_price,
            base_cost_bps=0.5
        )
        
        # 加入波动率调整
        if volatility and volatility > 0.02:
            result.slippage_bps *= (1 + volatility * 5)
            
        return result


def demo_impact_calculation():
    """演示:不同订单量下的冲击成本"""
    calculator = ImpactCostCalculator(impact_coefficient=0.15)
    
    # 模拟某时刻 BTC 订单簿
    bids = [
        OrderBookLevel(price=50000.0, volume=2.5),
        OrderBookLevel(price=49900.0, volume=5.0),
        OrderBookLevel(price=49800.0, volume=8.0),
        OrderBookLevel(price=49700.0, volume=12.0),
        OrderBookLevel(price=49600.0, volume=15.0),
    ]
    asks = [
        OrderBookLevel(price=50100.0, volume=3.0),
        OrderBookLevel(price=50200.0, volume=6.0),
        OrderBookLevel(price=50300.0, volume=10.0),
        OrderBookLevel(price=50400.0, volume=14.0),
        OrderBookLevel(price=50500.0, volume=18.0),
    ]
    
    decision_price = 50000.0  # 假设你在 50000 决定买入
    
    print("=" * 60)
    print(f"{'订单量':<12}{'成交均价':<12}{'滑点(bps)':<12}{'消耗档位':<10}{'未成交量':<12}")
    print("=" * 60)
    
    for qty in [1, 5, 10, 25, 50]:
        result = calculator.calculate_execution(
            order_side="buy",
            order_qty=qty,
            bids=bids,
            asks=asks,
            decision_price=decision_price
        )
        print(f"{qty:<12}{result.avg_price:<12.2f}{result.slippage_bps:<12.2f}{result.levels_consumed:<10}{result.remaining_qty:<12.2f}")
    
    print("=" * 60)


demo_impact_calculation()

输出示例:

============================================================
订单量        成交均价      滑点(bps)    消耗档位      未成交量      
============================================================
1            50100.00     100.00      1            0.00         
5            50150.00     150.00      2            0.00         
10           50233.33     233.33      3            0.00         
25           50328.57     328.57      4            0.00         
50           0.00         inf         0            50.00        
============================================================

可以看到:5 BTC 时滑点约 150 bps(0.15%),10 BTC 时跳到 233 bps,50 BTC 根本无法在当前深度内成交——这就是大订单在实盘中的困境。

4.2 批量订单簿回放回测

单个时刻的计算意义有限,真正的回测需要批量回放——对历史上每个信号触发时刻的订单簿进行回放,计算累计冲击成本:

from datetime import datetime, timedelta
from typing import List, Dict
import statistics

class OrderBookReplayBacktester:
    """
    基于订单簿回放的回测引擎
    
    核心流程:
    1. 加载信号事件列表(时间戳 + 标的 + 方向 + 数量)
    2. 对每个事件,获取当时的历史订单簿快照
    3. 用 ImpactCostCalculator 计算实际执行成本
    4. 汇总累计收益 vs 无冲击成本假设的收益
    """
    
    def __init__(self, depth_client, impact_calculator: ImpactCostCalculator):
        self.depth_client = depth_client
        self.impact_calculator = impact_calculator
        
    def run_backtest(
        self,
        signals: List[Dict],
        market_state_fn=None  # fn(timestamp) -> str,返回市场状态
    ) -> Dict:
        """
        运行回放回测
        
        Args:
            signals: 信号列表,每项包含 timestamp, symbol, side, qty, decision_price
            market_state_fn: 可选,返回某个时间点的市场状态
            
        Returns:
            回测结果统计
        """
        results = []
        
        for signal in signals:
            timestamp = signal["timestamp"]
            symbol = signal["symbol"]
            side = signal["side"]
            qty = signal["qty"]
            decision_price = signal["decision_price"]
            
            # 获取当时订单簿
            depth_data = self.depth_client.get_historical_depth(
                symbol=symbol,
                timestamp=timestamp,
                limit=20
            )
            
            if not depth_data:
                continue
                
            bids = [
                OrderBookLevel(price=float(b[0]), volume=float(b[1]))
                for b in depth_data.get("bids", [])
            ]
            asks = [
                OrderBookLevel(price=float(a[0]), volume=float(a[1]))
                for a in depth_data.get("asks", [])
            ]
            
            if not bids or not asks:
                continue
            
            # 确定市场状态
            market_state = "normal"
            if market_state_fn:
                market_state = market_state_fn(timestamp)
            
            # 计算实际执行成本
            result = self.impact_calculator.calculate_with_market_state_adjustment(
                order_side=side,
                order_qty=qty,
                bids=bids,
                asks=asks,
                decision_price=decision_price,
                market_state=market_state
            )
            
            # 计算 PnL(简化版本)
            if result.filled_qty > 0:
                # 假设 1 天后以决策价格平仓(忽略其他成本)
                pnl_if_no_impact = 0
                pnl_actual = -result.slippage_bps * qty * decision_price / 10000
                
                results.append({
                    "timestamp": timestamp,
                    "symbol": symbol,
                    "slippage_bps": result.slippage_bps,
                    "filled_qty": result.filled_qty,
                    "pnl_no_impact": pnl_if_no_impact,
                    "pnl_actual": pnl_actual
                })
        
        # 统计汇总
        if not results:
            return {"error": "No valid results"}
            
        slippage_list = [r["slippage_bps"] for r in results if r["slippage_bps"] != float('inf')]
        
        return {
            "total_signals": len(signals),
            "valid_executions": len(results),
            "avg_slippage_bps": statistics.mean(slippage_list) if slippage_list else 0,
            "max_slippage_bps": max(slippage_list) if slippage_list else 0,
            "slippage_std": statistics.stdev(slippage_list) if len(slippage_list) > 1 else 0,
            "total_cost_bps": sum(slippage_list) if slippage_list else 0,
            "worst_case": max(results, key=lambda x: x["slippage_bps"]) if results else None,
            "individual_results": results
        }


# 使用示例
def demo_batch_backtest():
    """
    模拟:对历史 10 个信号进行订单簿回放回测
    """
    import os
    
    api_key = os.environ.get("TICKDB_API_KEY")
    depth_client = TickDBHistoricalDepth(api_key)
    calculator = ImpactCostCalculator(impact_coefficient=0.15)
    backtester = OrderBookReplayBacktester(depth_client, calculator)
    
    # 模拟信号列表(实际应用中从策略信号数据库读取)
    signals = [
        {
            "timestamp": 1708017600000,  # 2024-02-15 16:00:00 UTC
            "symbol": "BTC.USDT",
            "side": "buy",
            "qty": 5.0,
            "decision_price": 50000.0
        },
        {
            "timestamp": 1708021200000,  # 2024-02-15 17:00:00 UTC
            "symbol": "BTC.USDT",
            "side": "buy",
            "qty": 3.0,
            "decision_price": 50200.0
        },
        # ... 更多信号
    ]
    
    # 市场状态函数(简化版,实际需要结合波动率、成交量等因素)
    def market_state_fn(timestamp: int) -> str:
        hour = (timestamp // 3600000) % 24
        if 9 <= hour < 16:
            return "normal"
        elif 21 <= hour or hour < 4:
            return "volatile"
        else:
            return "normal"
    
    try:
        report = backtester.run_backtest(signals, market_state_fn)
        
        print("=" * 60)
        print("订单簿回放回测报告")
        print("=" * 60)
        print(f"总信号数: {report['total_signals']}")
        print(f"有效执行: {report['valid_executions']}")
        print(f"平均滑点: {report['avg_slippage_bps']:.2f} bps")
        print(f"最大滑点: {report['max_slippage_bps']:.2f} bps")
        print(f"滑点标准差: {report['slippage_std']:.2f} bps")
        print(f"累计成本: {report['total_cost_bps']:.2f} bps")
        print("=" * 60)
        
    except Exception as e:
        print(f"回测失败: {e}")


# demo_batch_backtest()

五、tick 级回测 vs Bar 级回测

5.1 粒度决定精度

回测精度与数据粒度直接相关。不同粒度的回测,对冲击成本的处理能力差异巨大:

回测粒度 数据内容 冲击成本处理 精度等级
Bar 级 OHLCV、收盘价 无,或用固定比例估算
分钟级 分钟 K 线 假设内部分布,简单估算 ⭐⭐
tick 级 每笔成交价 基于订单簿动态估算 ⭐⭐⭐
订单簿级 档位快照 精确回放每一档 ⭐⭐⭐⭐

Bar 级回测的致命缺陷:你不知道在那一根 K 线的生命周期内发生了什么。价格可能在开盘后立即跳空,也可能在收盘前一直横盘。你的订单在任何情况下都假设以"某个价格"成交,这是对市场微观结构的完全无视。

5.2 tick 级回测的实现要点

真正的 tick 级回测需要:

  1. tick 数据序列:每笔成交的时间戳、价格、成交量
  2. 订单簿快照序列:每个时刻的盘口深度(可选,但更准确)
  3. 信号触发与订单执行的时间序列:区分"信号时间"和"实际成交时间"
  4. 限速模型:大订单分批成交的时间分布
class TickLevelBacktester:
    """
    tick 级回测引擎
    
    核心改进:
    1. 信号触发后,模拟订单成交的时间延迟
    2. 每个时刻用当时的最佳买卖盘计算实际成交价
    3. 大订单分批成交,逐批计算冲击成本
    """
    
    def __init__(self, depth_client, impact_calculator):
        self.depth_client = depth_client
        self.impact_calculator = impact_calculator
        
    def simulate_large_order(
        self,
        symbol: str,
        side: str,
        total_qty: float,
        start_timestamp: int,
        time_window_ms: int = 60000,  # 1 分钟内分批成交
        num_slices: int = 5
    ) -> Dict:
        """
        模拟大订单分批成交
        
        Args:
            symbol: 交易品种
            side: buy/sell
            total_qty: 总订单量
            start_timestamp: 开始时间戳
            time_window_ms: 时间窗口(毫秒)
            num_slices: 分批数
        """
        slice_qty = total_qty / num_slices
        slice_interval = time_window_ms / num_slices
        
        results = []
        total_slippage = 0.0
        total_cost = 0.0
        
        for i in range(num_slices):
            slice_time = start_timestamp + i * slice_interval
            
            # 获取该时刻的订单簿
            depth_data = self.depth_client.get_historical_depth(
                symbol=symbol,
                timestamp=slice_time,
                limit=20
            )
            
            if not depth_data:
                continue
            
            bids = [
                OrderBookLevel(price=float(b[0]), volume=float(b[1]))
                for b in depth_data.get("bids", [])
            ]
            asks = [
                OrderBookLevel(price=float(a[0]), volume=float(a[1]))
                for a in depth_data.get("asks", [])
            ]
            
            # 用该时刻的最佳价格作为决策价
            decision_price = asks[0].price if side == "buy" else bids[0].price
            
            result = self.impact_calculator.calculate_execution(
                order_side=side,
                order_qty=slice_qty,
                bids=bids,
                asks=asks,
                decision_price=decision_price
            )
            
            if result.filled_qty > 0:
                total_slippage += result.slippage_bps * result.filled_qty
                total_cost += result.avg_price * result.filled_qty
                
                results.append({
                    "slice": i + 1,
                    "timestamp": slice_time,
                    "filled_qty": result.filled_qty,
                    "avg_price": result.avg_price,
                    "slippage_bps": result.slippage_bps
                })
        
        if not results:
            return {"error": "No fills"}
        
        total_filled = sum(r["filled_qty"] for r in results)
        avg_slippage = total_slippage / total_filled if total_filled > 0 else 0
        avg_price = total_cost / total_filled if total_filled > 0 else 0
        
        return {
            "total_qty": total_qty,
            "filled_qty": total_filled,
            "remaining_qty": total_qty - total_filled,
            "num_slices": len(results),
            "avg_price": avg_price,
            "avg_slippage_bps": avg_slippage,
            "slice_results": results
        }


# 使用示例
def demo_tick_backtest():
    """演示:50 BTC 大单分 5 批成交 vs 一次性成交的滑点对比"""
    
    calculator = ImpactCostCalculator(impact_coefficient=0.15)
    
    # 模拟订单簿随时间变化(简化:假设订单簿逐渐变深)
    asks_timeline = [
        [OrderBookLevel(50000, 3), OrderBookLevel(50100, 6), OrderBookLevel(50200, 10)],
        [OrderBookLevel(50010, 4), OrderBookLevel(50110, 7), OrderBookLevel(50210, 12)],
        [OrderBookLevel(50020, 5), OrderBookLevel(50120, 9), OrderBookLevel(50220, 15)],
        [OrderBookLevel(50030, 6), OrderBookLevel(50130, 10), OrderBookLevel(50230, 18)],
        [OrderBookLevel(50040, 8), OrderBookLevel(50140, 12), OrderBookLevel(50240, 20)],
    ]
    
    print("=" * 70)
    print("大单分批成交 vs 一次性成交:滑点对比")
    print("=" * 70)
    
    # 一次性成交(模拟 Bar 级回测)
    bids = []
    asks一次性 = asks_timeline[0]
    result_once = calculator.calculate_execution(
        order_side="buy",
        order_qty=10,
        bids=bids,
        asks=asks一次性,
        decision_price=50000
    )
    
    # 分 5 批成交(tick 级模拟)
    total_slippage_weighted = 0
    total_qty = 0
    for asks in asks_timeline:
        result = calculator.calculate_execution(
            order_side="buy",
            order_qty=2,
            bids=[],
            asks=asks,
            decision_price=asks[0].price
        )
        if result.filled_qty > 0:
            total_slippage_weighted += result.slippage_bps * result.filled_qty
            total_qty += result.filled_qty
    
    avg_slippage_sliced = total_slippage_weighted / total_qty if total_qty > 0 else 0
    
    print(f"一次性成交滑点: {result_once.slippage_bps:.2f} bps")
    print(f"分 5 批成交平均滑点: {avg_slippage_sliced:.2f} bps")
    print(f"分批策略节省: {result_once.slippage_bps - avg_slippage_sliced:.2f} bps ({(result_once.slippage_bps - avg_slippage_sliced) / result_once.slippage_bps * 100:.1f}%)")
    print("=" * 70)


demo_tick_backtest()

输出示例:

======================================================================
大单分批成交 vs 一次性成交:滑点对比
======================================================================
一次性成交滑点: 200.00 bps
分 5 批成交平均滑点: 126.67 bps
分批策略节省: 73.33 bps (36.7%)
======================================================================

分批成交相比一次性成交,可以节省约 37% 的冲击成本。这验证了"订单拆分"策略在工程层面的合理性——不只是心理安慰,是真实的经济效益。


六、实战:构建你的冲击成本感知回测框架

6.1 架构总览

一个完整的冲击成本感知回测框架应该包含以下组件:

┌─────────────────────────────────────────────────────────────────┐
│                     信号层 (Signals)                            │
│   策略产生的交易信号:标的、方向、数量、决策价格、触发时间       │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                  订单簿回放层 (Order Book Replay)               │
│   根据触发时间获取历史订单簿快照,或实时订阅深度数据             │
│   数据源:TickDB depth API / WebSocket                          │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│               冲击成本计算层 (Impact Engine)                     │
│   - ImpactCostCalculator: 单笔订单执行成本                       │
│   - OrderSlicer: 大单拆分逻辑                                   │
│   - MarketStateDetector: 市场状态识别(影响冲击系数)            │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                   回测引擎 (Backtester)                         │
│   - OrderBookReplayBacktester: 基于订单簿的回放回测              │
│   - TickLevelBacktester: tick 级精细回测                        │
│   - 汇总 PnL、胜率、滑点分布、最大单笔滑点                       │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                   报告层 (Reports)                              │
│   对比"无冲击成本"与"有冲击成本"的收益曲线差异                  │
│   输出滑点分布直方图、最差案例分析                               │
└─────────────────────────────────────────────────────────────────┘

6.2 配置建议

参数 推荐值 说明
冲击系数(正常) 0.12 - 0.15 流动性充足时段
冲击系数(波动) 0.25 - 0.35 非农、CPI 发布等
冲击系数(危机) 0.50+ VIX > 30
基础成本 0.5 - 1.0 bps 佣金 + spread
单笔最大下单比例 日均成交量的 5% 超过需分批
分批间隔 10-60 秒 根据策略频率调整

6.3 回测局限性说明

即使使用了订单簿回放和 tick 级回测,回测结果仍然存在以下局限:

  1. 历史订单簿不完整:大多数数据源不保存完整的 tick-by-tick 订单簿快照,只能用有限档位估算。
  2. 你的存在会改变订单簿:回测假设订单簿结构不变,但实盘中大资金建仓本身会"吓跑"流动性提供商,导致订单簿变薄。
  3. 市场冲击非对称:卖出时的影响通常大于买入,因为市场存在"追涨杀跌"的正反馈效应。
  4. 滑点统计分布未知:滑点不是正态分布,存在肥尾,极端情况下滑点可能是均值的 5-10 倍。

建议:回测结果乘以 0.7 作为保守估计,并在实盘中设置"滑点容忍阈值",超过后自动撤单或分批。


结语

滑点是回测与实盘之间最隐蔽的裂缝。它不是"运气不好",而是市场微观结构的必然产物。当你的策略资金量越大、交易频率越高、与大众方向越一致,滑点对你的收益侵蚀就越显著。

本文的核心方法论

  • 用订单簿回放替代固定滑点假设
  • 用冲击成本模型替代简单的百分比估算
  • 用 tick 级粒度替代 Bar 级粒度
  • 用动态冲击系数替代静态参数

如果你还在用"假设滑点 0.1%"来回测,是时候改变了。


下一步行动

如果你想亲手实现本文框架

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行
  4. 用你的策略信号数据替换示例信号,验证真实滑点分布

如果你需要tick级历史数据做深度回测
联系 [email protected] 了解专业版数据方案,包含完整的订单簿快照序列和成交明细。

如果你习惯用 AI 辅助开发
在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言查询历史深度数据。


风险提示:本文不构成任何投资建议。回测结果基于历史数据模拟,不代表未来实际收益。实盘交易存在滑点、执行延迟、流动性枯竭等风险因素,请谨慎评估。