当市场分裂时,价差会说话

2008 年 9 月 16 日,雷曼兄弟申请破产后的第三天。大量机构投资者被迫去杠杆化,清算所有可变现资产。在这场集体性抛售中,一只名为 "LEH" 的股票——雷曼兄弟——在 30 分钟内跌去近半,与此同时,大量与之毫无业务关联的股票也在同步暴跌。

但一位遵循特定规则的量化交易者此刻看到的是机会,而非恐慌。

她知道,LEH 与同板块的其他股票之间并没有经济意义上的联动。它们被同一波流动性危机裹挟,而不是被同一条基本面的逻辑线牵引。这种"假性联动"是均值回归的燃料。而均值回归,正是统计套利的基石。

这不是 2008 年独有的现象。2021 年 1 月 GameStop 逼空期间,大量" meme 股"同步异动;2022 年美联储激进加息时,高股息板块与成长板块的负相关短暂失效——这些时刻都在重复同一个规律:系统性风险会制造假性联动,摧毁相关但摧毁不了协整

本文拆解统计套利中最经典的策略——协整配对交易——的完整工程实现:从数千只美股中筛选协整对,到实时监控价差并生成交易信号,再到如何用 TickDB 的历史 K 线数据完成策略回测验证。


一、为什么是协整,不是相关

在动手之前,必须把最重要的概念理清楚——协整(Cointegration)和相关(Correlation)不是一回事。

相关性衡量的是两个序列变化趋势是否一致。X 涨,Y 也倾向于涨,它们就是相关的。但这种关系是脆弱的:如果市场情绪逆转,两个相关资产可以同步下跌,毫无"纠偏"机制。

协整性描述的是一种更深层的约束关系:两个(或多个)变量在短期内可能各自游走,但它们之间的线性组合是一个平稳序列。换句话说,偏离是暂时的,系统有一股"引力"把它们拉回均衡位置。

用一个经典比喻:一只喝醉的人和一只狗同时走路。喝醉的人步伐和狗的步伐高度相关——都在随机游走。但喝醉的人和狗之间的距离呢?那个距离是均值回归的,因为两个人/狗最终都会朝家的方向走。这个"距离"就是协整关系。

属性 相关性 协整性
衡量对象 收益率的同向性 价差的平稳性
稳定性 时变,市场结构改变后可能失效 更深层的经济联系,稳定性更强
适用场景 趋势类策略 均值回归类策略
计算方法 Pearson 相关系数 Engle-Granger / Johansen 检验
是否可套利 不直接

对于配对交易而言,我们追求的正是协整性而非相关性。两只股票价格"一起涨"不构成交易理由,但"它们之间存在平稳的价差序列"才是。


二、协整检验:从数千只股票到候选配对

2.1 筛选框架

面对数千只美股,穷举所有配对是不现实的(n 只股票需要 n(n-1)/2 对,全部检验成本极高)。需要一个多阶段筛选流程:

阶段一:预处理
    ├── 过滤低流动性标的(日均成交量 < 50 万股)
    ├── 过滤高波动异常值(日收益率标准差 > 5% 的标的排除)
    └── 统一价格量纲(取对数收益率)

阶段二:粗筛——距离法
    ├── 计算历史价格序列之间的欧氏距离(或标准化距离)
    ├── 取距离最小的 Top-N 候选对(如 Top 500)
    └── 理由:协整序列的距离通常较小

阶段三:精筛——统计检验
    ├── 对候选对逐一进行 ADF 检验(协整检验的核心)
    ├── 记录 p-value、临界值、平稳性结论
    └── 选取 p-value < 0.05 的配对

2.2 ADF 检验原理

Augmented Dickey-Fuller(ADF)检验是协整检验的基石。它检验以下假设:

  • H0(零假设):序列存在单位根,即非平稳
  • H1(备择假设):序列是平稳的(或者说不存在单位根)

若拒绝 H0,则价差序列是平稳的——协整关系成立。

ADF 检验的核心回归方程:

$$\Delta y_t = \alpha + \beta t + \gamma y_{t-1} + \sum_{i=1}^{p} \delta_i \Delta y_{t-i} + \epsilon_t$$

检验的核心是 $\gamma = 0$ 是否成立。若 $\gamma \neq 0$,则 $y_{t-1}$ 对序列的平稳性有贡献,即协整成立。

2.3 候选对筛选实现

以下代码展示从标的池到候选协整对的完整筛选流程:

import os
import json
import time
import random
import numpy as np
import pandas as pd
from scipy import stats
from statsmodels.tsa.stattools import adfuller

# ─────────────────────────────────────────────
# TickDB 历史 K 线数据获取
# ─────────────────────────────────────────────
def fetch_historical_klines(symbol: str, interval: str = "1d", limit: int = 500) -> pd.DataFrame:
    """
    从 TickDB 获取指定标的的历史 K 线数据。
    interval: 1m/5m/1h/4h/1d/week/month
    返回 DataFrame,列:[timestamp, open, high, low, close, volume]
    """
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")

    import requests
    url = "https://api.tickdb.ai/v1/market/kline"
    headers = {"X-API-Key": api_key}
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit
    }

    try:
        response = requests.get(
            url,
            headers=headers,
            params=params,
            timeout=(3.05, 10)
        )

        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 5))
            raise RateLimitError(f"限频,等待 {retry_after} 秒")

        data = response.json()
        if data.get("code") == 3001:
            retry_after = int(response.headers.get("Retry-After", 5))
            raise RateLimitError(f"限频,等待 {retry_after} 秒")
        if data.get("code") != 0:
            raise RuntimeError(f"API 错误 {data.get('code')}: {data.get('message')}")

        klines = data["data"]
        df = pd.DataFrame(klines)
        # 字段映射
        df["timestamp"] = pd.to_datetime(df["ts"], unit="ms")
        df = df[["timestamp", "open", "high", "low", "close", "volume"]]
        df = df.sort_values("timestamp").reset_index(drop=True)
        return df

    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"网络请求失败: {e}")


class RateLimitError(Exception):
    """限频异常,用于触发重连退避"""
    pass


def fetch_multiple_symbols(symbols: list, interval: str = "1d", limit: int = 500, max_retries: int = 3) -> dict:
    """
    批量获取多只标的的 K 线数据。
    ⚠️ 高频调用需配合限频退避逻辑。
    """
    results = {}
    base_delay = 2.0
    max_delay = 60.0

    for i, symbol in enumerate(symbols):
        retry_count = 0
        while retry_count < max_retries:
            try:
                df = fetch_historical_klines(symbol, interval, limit)
                results[symbol] = df
                break
            except RateLimitError as e:
                wait_time = int(str(e).split("等待 ")[-1].rstrip(" 秒"))
                print(f"[{symbol}] 触发限频,等待 {wait_time} 秒")
                time.sleep(wait_time)
                retry_count += 1
            except Exception as e:
                print(f"[{symbol}] 获取失败: {e}")
                results[symbol] = None
                break

        # 指数退避 + 抖动:避免大量请求在同一时刻发起
        delay = min(base_delay * (2 ** retry_count), max_delay)
        jitter = random.uniform(0, delay * 0.1)
        time.sleep(delay + jitter)

    return results


# ─────────────────────────────────────────────
# 协整检验核心函数
# ─────────────────────────────────────────────
def cointegration_test(series1: pd.Series, series2: pd.Series) -> dict:
    """
    对两个价格序列进行协整检验。
    返回检验结果字典,包含 ADF 统计量、p-value、临界值和结论。
    """
    if len(series1) != len(series2):
        raise ValueError("两个序列长度必须相同")

    # Engle-Granger 两步法:
    # 第一步:用 OLS 回归 y = α + β * x + ε,求出残差序列 ε(即价差 spread)
    slope, intercept, _, _, _ = stats.linregress(series2, series1)
    spread = series1 - (intercept + slope * series2)

    # 第二步:对残差序列 ε 做 ADF 检验
    adf_result = adfuller(spread, maxlag=1, regression="c")

    adf_stat = adf_result[0]
    p_value = adf_result[1]
    critical_values = adf_result[4]

    # 临界值取 5% 显著性水平
    is_cointegrated = p_value < 0.05 and adf_stat < critical_values["5%"]

    return {
        "spread_mean": spread.mean(),
        "spread_std": spread.std(),
        "hedge_ratio": slope,
        "intercept": intercept,
        "adf_statistic": adf_stat,
        "p_value": p_value,
        "critical_values": critical_values,
        "is_cointegrated": is_cointegrated,
        "sample_size": len(spread)
    }


def screen_pairs(universe: list, interval: str = "1d", lookback: int = 500,
                 top_candidate: int = 200) -> pd.DataFrame:
    """
    从股票池中筛选候选协整对。
    
    参数:
        universe: 标的代码列表,如 ["AAPL.US", "MSFT.US"]
        interval: K 线周期
        lookback: 历史数据量
        top_candidate: 粗筛阶段保留的候选对数量
    """
    print(f"正在获取 {len(universe)} 只标的的历史数据...")
    all_prices = fetch_multiple_symbols(universe, interval, limit=lookback)
    print(f"数据获取完成,有效标的: {sum(1 for v in all_prices.values() if v is not None)} 只")

    # 构建价格矩阵(收盘价)
    price_data = {}
    for symbol, df in all_prices.items():
        if df is not None and len(df) >= lookback * 0.9:
            price_data[symbol] = df["close"].values

    symbols = list(price_data.keys())
    print(f"进入粗筛阶段,共 {len(symbols)} 只有效标的")

    # 粗筛:计算标准化价格距离,保留 Top-N 候选对
    n = len(symbols)
    distances = []

    for i in range(n):
        for j in range(i + 1, n):
            s1 = symbols[i]
            s2 = symbols[j]

            # 标准化后计算欧氏距离
            p1 = (price_data[s1] - price_data[s1].mean()) / (price_data[s1].std() + 1e-8)
            p2 = (price_data[s2] - price_data[s2].mean()) / (price_data[s2].std() + 1e-8)
            dist = np.sqrt(np.mean((p1 - p2) ** 2))

            distances.append({
                "symbol1": s1,
                "symbol2": s2,
                "distance": dist
            })

    # 按距离升序排列,取前 top_candidate
    distances_df = pd.DataFrame(distances)
    distances_df = distances_df.sort_values("distance").head(top_candidate)
    print(f"粗筛完成,保留 {len(distances_df)} 对候选对")

    # 精筛:对候选对逐一做 ADF 协整检验
    results = []
    for _, row in distances_df.iterrows():
        s1_prices = pd.Series(price_data[row["symbol1"]])
        s2_prices = pd.Series(price_data[row["symbol2"]])

        try:
            result = cointegration_test(s1_prices, s2_prices)
            result["symbol1"] = row["symbol1"]
            result["symbol2"] = row["symbol2"]
            results.append(result)
        except Exception as e:
            print(f"协整检验失败 {row['symbol1']}/{row['symbol2']}: {e}")

    results_df = pd.DataFrame(results)

    # 过滤出通过协整检验的配对
    valid_pairs = results_df[results_df["is_cointegrated"] == True].copy()
    valid_pairs = valid_pairs.sort_values("p_value").reset_index(drop=True)

    print(f"\n协整检验完成,共 {len(valid_pairs)} 对通过检验 (p < 0.05)")
    print(f"Top 5 候选配对(按 p-value 排序):")
    print(valid_pairs[["symbol1", "symbol2", "p_value", "hedge_ratio"]].head())

    return valid_pairs


# ─────────────────────────────────────────────
# 示例:运行筛选流程
# ─────────────────────────────────────────────
if __name__ == "__main__":
    # 注意:实盘中 universe 应为完整的可交易标的列表
    # 这里以几只代表性股票作为演示
    demo_universe = [
        "AAPL.US", "MSFT.US", "GOOGL.US", "AMZN.US",
        "XOM.US", "CVX.US", "JPM.US", "BAC.US",
        "JNJ.US", "PFE.US", "UNH.US", "MRK.US"
    ]

    candidate_pairs = screen_pairs(
        universe=demo_universe,
        interval="1d",
        lookback=500,
        top_candidate=30
    )

运行以上脚本后,会输出类似以下格式的候选配对表格:

symbol1 symbol2 p_value adf_statistic hedge_ratio spread_std
XOM.US CVX.US 0.0023 -3.894 1.231 2.14
JNJ.US PFE.US 0.0087 -3.452 0.876 1.89
JPM.US BAC.US 0.0142 -3.218 1.089 3.05

其中,XOM(埃克森美孚)与 CVX(雪佛龙)构成一对经典的能源板块配对——两者业务高度同质,原油价格是共同驱动因子,均值回归特性稳定。


三、价差 Z-Score:信号从何而来

找到协整对只是第一步。交易的核心在于:价差偏离均衡多少个标准差时入场?偏离多少时离场?

这就要引入 Z-Score(标准分数):

$$Z_t = \frac{Spread_t - \mu_{spread}}{\sigma_{spread}}$$

其中 $\mu_{spread}$ 和 $\sigma_{spread}$ 通常用滚动窗口估算,而非全历史均值(因为协整关系可能缓慢漂移)。

3.1 经典阈值设置

Z-Score 区间 信号含义 操作
Z > +2.0 价差偏高,上轨突破 卖出 symbol1,买入 symbol2(空头对冲)
Z < -2.0 价差偏低,下轨突破 买入 symbol1,卖出 symbol2(多头对冲)
Z ∈ (-0.5, +0.5) 价差接近均衡 平仓,持有现金
Z 回归至 ±0.5 止盈信号 价差收敛时锁定利润
Z 突破 ±3.0 止损信号 极端偏离,市场逻辑可能已变化

这些阈值不是固定的。不同资产类别、不同波动环境下,最优阈值差异显著。下一节中的回测模块提供了参数优化的框架。

3.2 Z-Score 实时计算

class SpreadMonitor:
    """
    实时监控协整对的价差 Z-Score。
    适用于 WebSocket 推送的 tick 数据(港股、数字货币)。
    ⚠️ 美股逐笔成交不支持,请使用 TickDB /kline/latest 端点获取实时 K 线。
    """

    def __init__(self, symbol1: str, symbol2: str,
                 hedge_ratio: float, intercept: float,
                 window: int = 20):
        self.symbol1 = symbol1
        self.symbol2 = symbol2
        self.hedge_ratio = hedge_ratio
        self.intercept = intercept
        self.window = window

        # 滚动均值和标准差的历史缓冲区
        self.price1_buffer = []
        self.price2_buffer = []
        self.spread_history = []

        # Z-Score 阈值
        self.entry_threshold = 2.0
        self.exit_threshold = 0.5
        self.stop_loss_threshold = 3.0

    def update(self, price1: float, price2: float) -> dict:
        """
        更新最新价格,计算实时 Z-Score。
        返回信号字典,包含 Z-Score、信号类型和建议操作。
        """
        self.price1_buffer.append(price1)
        self.price2_buffer.append(price2)

        # 维持滚动窗口大小
        if len(self.price1_buffer) > self.window * 2:
            self.price1_buffer.pop(0)
            self.price2_buffer.pop(0)

        if len(self.price1_buffer) < self.window:
            return {"status": "warming_up", "z_score": None}

        # 计算当前价差
        current_spread = price1 - (self.intercept + self.hedge_ratio * price2)

        # 计算滚动均值和标准差
        recent_spread_history = self.spread_history[-self.window:] if self.spread_history else []
        if len(recent_spread_history) < self.window:
            self.spread_history.append(current_spread)
            return {"status": "warming_up", "z_score": None}

        spread_mean = np.mean(recent_spread_history)
        spread_std = np.std(recent_spread_history) + 1e-8

        z_score = (current_spread - spread_mean) / spread_std

        # 更新价差历史
        self.spread_history.append(current_spread)
        if len(self.spread_history) > self.window * 3:
            self.spread_history.pop(0)

        # 生成交易信号
        signal = self._generate_signal(z_score)

        return {
            "status": "ready",
            "z_score": round(z_score, 3),
            "spread": round(current_spread, 4),
            "signal": signal,
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }

    def _generate_signal(self, z_score: float) -> dict:
        """根据 Z-Score 生成交易信号"""
        if z_score > self.stop_loss_threshold:
            return {
                "action": "STOP_LOSS",
                "direction": "short_spread",
                "description": f"Z={z_score:.2f} 突破止损阈值 {self.stop_loss_threshold},强制平仓"
            }
        elif z_score > self.entry_threshold:
            return {
                "action": "ENTRY",
                "direction": "short_spread",
                "description": f"Z={z_score:.2f} > +{self.entry_threshold},卖出 {self.symbol1},买入 {self.symbol2}"
            }
        elif z_score < -self.entry_threshold:
            return {
                "action": "ENTRY",
                "direction": "long_spread",
                "description": f"Z={z_score:.2f} < -{self.entry_threshold},买入 {self.symbol1},卖出 {self.symbol2}"
            }
        elif abs(z_score) < self.exit_threshold:
            return {
                "action": "EXIT",
                "direction": "neutral",
                "description": f"Z={z_score:.2f} 回归均衡,平仓"
            }
        else:
            return {
                "action": "HOLD",
                "direction": "neutral",
                "description": f"Z={z_score:.2f} 处于中性区间,观望"
            }


# ─────────────────────────────────────────────
# WebSocket 实时推送订阅(适用于港股、数字货币)
# ─────────────────────────────────────────────
import websocket
import threading
import queue

class RealtimeSpreadWatcher:
    """
    通过 WebSocket 订阅 TickDB 实时行情,驱动 SpreadMonitor。
    适用于支持 trades 频道的标的(港股、数字货币)。
    ⚠️ 美股不支持 tick 级逐笔,此处示例代码适用于港股/数字货币。
    """

    RECONNECT_BASE_DELAY = 2.0
    RECONNECT_MAX_DELAY = 60.0
    MAX_RECONNECT_ATTEMPTS = 10

    def __init__(self, symbols: list, spread_monitor: SpreadMonitor):
        self.symbols = symbols
        self.monitor = spread_monitor
        self.ws = None
        self.reconnect_attempts = 0
        self.latest_prices = {sym: None for sym in symbols}
        self._msg_queue = queue.Queue()
        self._running = False

    def start(self, api_key: str):
        """启动 WebSocket 连接"""
        self.api_key = api_key
        self._running = True
        self._connect()
        # 启动独立线程处理消息
        thread = threading.Thread(target=self._message_loop, daemon=True)
        thread.start()

    def _connect(self):
        """建立 WebSocket 连接"""
        try:
            symbols_param = ",".join(self.symbols)
            # ⚠️ API Key 以 URL 参数形式传递(TickDB WebSocket 规范)
            ws_url = f"wss://api.tickdb.ai/ws?api_key={self.api_key}&symbols={symbols_param}&channels=trades"

            self.ws = websocket.WebSocketApp(
                ws_url,
                on_open=self._on_open,
                on_message=self._on_message,
                on_error=self._on_error,
                on_close=self._on_close
            )

            thread = threading.Thread(target=self.ws.run_forever, daemon=True)
            thread.start()

        except Exception as e:
            print(f"WebSocket 连接建立失败: {e}")
            self._schedule_reconnect()

    def _on_open(self, ws):
        """连接建立成功"""
        print(f"[{time.strftime('%H:%M:%S')}] WebSocket 连接已建立,开始接收实时数据")
        self.reconnect_attempts = 0
        # 发送心跳保活(TickDB WebSocket 规范)
        ws.send(json.dumps({"cmd": "ping"}))

    def _on_message(self, ws, message):
        """接收并分发消息"""
        self._msg_queue.put(message)

    def _on_error(self, ws, error):
        """错误处理"""
        print(f"WebSocket 错误: {error}")

    def _on_close(self, ws, code, reason):
        """连接关闭后触发重连"""
        print(f"WebSocket 关闭 (code={code}, reason={reason})")
        self._schedule_reconnect()

    def _schedule_reconnect(self):
        """指数退避重连"""
        if not self._running:
            return

        delay = min(
            self.RECONNECT_BASE_DELAY * (2 ** self.reconnect_attempts),
            self.RECONNECT_MAX_DELAY
        )
        # 加抖动避免惊群
        jitter = random.uniform(0, delay * 0.1)
        wait_time = delay + jitter

        print(f"[{time.strftime('%H:%M:%S')}] {wait_time:.1f} 秒后尝试重连 (第 {self.reconnect_attempts + 1} 次)")
        time.sleep(wait_time)

        if self.reconnect_attempts < self.MAX_RECONNECT_ATTEMPTS:
            self.reconnect_attempts += 1
            self._connect()
        else:
            print("达到最大重连次数,请检查网络或 API Key")

    def _message_loop(self):
        """独立线程:处理消息队列,驱动 Z-Score 计算"""
        last_heartbeat = time.time()

        while self._running:
            try:
                message = self._msg_queue.get(timeout=1.0)
                data = json.loads(message)

                # 心跳保活:每 30 秒检测一次 ping/pong
                if time.time() - last_heartbeat > 30:
                    if self.ws and self.ws.sock and self.ws.sock.connected:
                        self.ws.send(json.dumps({"cmd": "ping"}))
                    last_heartbeat = time.time()

                # 处理 trades 频道数据
                if data.get("channel") == "trades" and "data" in data:
                    for trade in data["data"]:
                        symbol = trade.get("s") or trade.get("symbol")
                        price = float(trade.get("p") or trade.get("price"))

                        if symbol in self.latest_prices:
                            self.latest_prices[symbol] = price

                    # 当两个标的都有最新价格时,计算 Z-Score
                    p1 = self.latest_prices.get(self.monitor.symbol1)
                    p2 = self.latest_prices.get(self.monitor.symbol2)

                    if p1 is not None and p2 is not None:
                        result = self.monitor.update(p1, p2)
                        if result["status"] == "ready":
                            self._log_signal(result)

            except queue.Empty:
                continue
            except json.JSONDecodeError:
                continue

    def _log_signal(self, result: dict):
        """记录信号到控制台(生产环境应接入告警系统)"""
        signal = result["signal"]
        timestamp = result["timestamp"]

        if signal["action"] in ("ENTRY", "STOP_LOSS"):
            print(f"[{timestamp}] 🔔 {signal['action']} | "
                  f"Z={result['z_score']} | {signal['description']}")
        else:
            print(f"[{timestamp}] Z={result['z_score']} | {signal['description']}")

    def stop(self):
        """停止 WebSocket 连接"""
        self._running = False
        if self.ws:
            self.ws.close()

四、回测框架:让历史说话

策略写完了,但纸上谈兵没有意义。任何量化策略在上线前,必须经过回测验证。以下是一个完整的回测引擎:

class PairBacktester:
    """
    配对交易策略回测引擎。
    支持参数扫描(阈值优化)和多指标绩效评估。
    """

    def __init__(self, symbol1: str, symbol2: str,
                 hedge_ratio: float, intercept: float,
                 entry_threshold: float = 2.0,
                 exit_threshold: float = 0.5,
                 stop_loss: float = 3.0,
                 window: int = 20,
                 initial_capital: float = 100_000.0,
                 commission: float = 0.001,
                 slippage: float = 0.0005):
        self.symbol1 = symbol1
        self.symbol2 = symbol2
        self.hedge_ratio = hedge_ratio
        self.intercept = intercept
        self.entry_threshold = entry_threshold
        self.exit_threshold = exit_threshold
        self.stop_loss = stop_loss
        self.window = window
        self.initial_capital = initial_capital
        self.commission = commission  # 佣金率
        self.slippage = slippage      # 滑点率

    def run(self, df1: pd.DataFrame, df2: pd.DataFrame) -> dict:
        """
        执行回测。

        参数:
            df1, df2: 包含 timestamp, close, volume 列的 DataFrame
        返回:
            回测绩效指标字典
        """
        # 合并数据
        merged = pd.merge(
            df1[["timestamp", "close"]].rename(columns={"close": "p1"}),
            df2[["timestamp", "close"]].rename(columns={"close": "p2"}),
            on="timestamp", how="inner"
        ).reset_index(drop=True)

        # 计算价差
        merged["spread"] = (
            merged["p1"] - (self.intercept + self.hedge_ratio * merged["p2"])
        )

        # 计算 Z-Score
        merged["z_score"] = (
            (merged["spread"] - merged["spread"].rolling(self.window).mean())
            / (merged["spread"].rolling(self.window).std() + 1e-8)
        )

        # 模拟交易
        position = 0  # +1: long spread, -1: short spread, 0: flat
        entry_z = 0
        pnl_list = []
        equity = self.initial_capital
        equity_curve = []

        for i, row in merged.iterrows():
            if pd.isna(row["z_score"]):
                equity_curve.append(equity)
                continue

            z = row["z_score"]
            price1 = row["p1"]
            price2 = row["p2"]

            # 开仓逻辑
            if position == 0:
                if z > self.entry_threshold:
                    position = -1
                    entry_z = z
                    entry_price1 = price1 * (1 + self.slippage)
                    entry_price2 = price2 * (1 - self.slippage)
                elif z < -self.entry_threshold:
                    position = 1
                    entry_z = z
                    entry_price1 = price1 * (1 - self.slippage)
                    entry_price2 = price2 * (1 + self.slippage)

            # 平仓逻辑
            elif position != 0:
                should_exit = False

                # 止盈
                if abs(z) < self.exit_threshold:
                    should_exit = True
                # 止损
                if abs(z) > self.stop_loss:
                    should_exit = True

                if should_exit:
                    exit_price1 = price1 * (1 - self.slippage) if position == 1 else price1 * (1 + self.slippage)
                    exit_price2 = price2 * (1 + self.slippage) if position == 1 else price2 * (1 - self.slippage)

                    # 计算利润(按一股配对为单位)
                    # long spread: 盈利 = (p1上涨 - entry_p1) - hedge_ratio * (p2下跌 - entry_p2)
                    if position == 1:
                        pnl_per_unit = (exit_price1 - entry_price1) - self.hedge_ratio * (entry_price2 - exit_price2)
                    else:
                        pnl_per_unit = (entry_price1 - exit_price1) - self.hedge_ratio * (exit_price2 - entry_price2)

                    # 考虑佣金
                    trade_value = abs(entry_price1) + abs(entry_price2) * self.hedge_ratio
                    net_pnl = pnl_per_unit - trade_value * self.commission

                    equity += net_pnl
                    pnl_list.append(net_pnl)
                    position = 0

            equity_curve.append(equity)

        # 计算绩效指标
        returns = pd.Series(equity_curve).pct_change().dropna()
        equity_series = pd.Series(equity_curve)

        # 最大回撤
        rolling_max = equity_series.expanding().max()
        drawdowns = equity_series - rolling_max
        max_drawdown = drawdowns.min()
        max_drawdown_pct = (max_drawdown / rolling_max[drawdowns.idxmin()]) * 100

        # 夏普比率(年化)
        annual_return = returns.mean() * 252
        annual_vol = returns.std() * np.sqrt(252)
        sharpe = annual_return / annual_vol if annual_vol != 0 else 0.0

        # 索提诺比率
        downside_returns = returns[returns < 0]
        downside_vol = downside_returns.std() * np.sqrt(252)
        sortino = annual_return / downside_vol if downside_vol != 0 else 0.0

        # 胜率
        wins = sum(1 for p in pnl_list if p > 0)
        total_trades = len(pnl_list)
        win_rate = wins / total_trades if total_trades > 0 else 0.0

        # 平均盈利/平均亏损
        avg_win = np.mean([p for p in pnl_list if p > 0]) if wins > 0 else 0.0
        avg_loss = abs(np.mean([p for p in pnl_list if p < 0])) if (total_trades - wins) > 0 else 0.0
        profit_factor = avg_win / avg_loss if avg_loss != 0 else float("inf")

        return {
            "initial_capital": self.initial_capital,
            "final_equity": round(equity, 2),
            "total_return": round((equity - self.initial_capital) / self.initial_capital * 100, 2),
            "total_trades": total_trades,
            "win_rate": round(win_rate, 3),
            "profit_factor": round(profit_factor, 2),
            "avg_win": round(avg_win, 2),
            "avg_loss": round(avg_loss, 2),
            "sharpe_ratio": round(sharpe, 3),
            "sortino_ratio": round(sortino, 3),
            "max_drawdown": round(max_drawdown, 2),
            "max_drawdown_pct": round(max_drawdown_pct, 2),
            "commission_rate": self.commission,
            "slippage_rate": self.slippage,
            "equity_curve": equity_curve
        }


# ─────────────────────────────────────────────
# 示例:回测 XOM-CVX 配对
# ─────────────────────────────────────────────
if __name__ == "__main__":
    df_xom = fetch_historical_klines("XOM.US", "1d", limit=1000)
    df_cvx = fetch_historical_klines("CVX.US", "1d", limit=1000)

    # 使用协整检验得到的参数
    test_result = cointegration_test(df_xom["close"], df_cvx["close"])

    backtester = PairBacktester(
        symbol1="XOM.US",
        symbol2="CVX.US",
        hedge_ratio=test_result["hedge_ratio"],
        intercept=test_result["intercept"],
        entry_threshold=2.0,
        exit_threshold=0.5,
        stop_loss=3.0,
        window=20,
        initial_capital=100_000,
        commission=0.001,
        slippage=0.0005
    )

    results = backtester.run(df_xom, df_cvx)

    print("\n" + "=" * 60)
    print("配对交易回测结果")
    print("=" * 60)
    for key, value in results.items():
        if key != "equity_curve":
            print(f"  {key:<25}: {value}")

五、回测结果解读:三个核心问题

拿到回测报告后,最重要的是回答三个问题:

5.1 问题一:策略盈利吗?

关注总收益率和夏普比率。如果回测周期覆盖了至少一个完整的牛熊周期(2008、2020、2022 等),且夏普比率 > 1.0,则策略具有初步的盈利基础。

夏普比率 评价 注意事项
< 0.5 风险调整后无正阿尔法
0.5 - 1.0 一般 可接受,但成本敏感
1.0 - 2.0 良好 具备实际使用价值
> 2.0 优秀 需警惕过拟合风险

5.2 问题二:最大回撤能承受吗?

配对交易通常被宣传为"低回撤",但这是有前提的。以下几个因素会导致回撤超出预期:

  • 协整关系失效:两只股票的基本面逻辑发生根本变化(如一家公司被收购),价差不再回归。
  • 极端事件冲击:2008 年、2020 年 3 月的市场流动性枯竭会导致价差急剧扩大,触及止损阈值前就损失惨重。
  • 参数过拟合:用全部历史数据优化 entry/exit 阈值,会严重高估策略表现。

建议:最大回撤应控制在初始资本的 15% 以内,且回撤持续时间应不超过 30 个交易日。

5.3 问题三:交易成本敏感吗?

配对交易是均值回归策略,利润来自无数个小额的收敛交易。如果交易成本(佣金 + 滑点)超过每笔交易的平均利润,策略将无法盈利。

一个简单的检验:
$$\text{滑点损耗} = 2 \times \text{滑点率} \times \text{平均持仓金额}$$

如果这个损耗接近或超过平均单笔利润,则策略对执行质量极度敏感,不适合低流动性标的。

回测局限性说明:上述回测结果基于历史数据模拟,不构成未来收益保证。回测中存在以下局限性:未完全模拟实际交易中的滑点和市场冲击成本(已假设 0.05% 固定滑点);未考虑极端行情下的流动性枯竭风险;样本量有限,统计显著性可能不足。建议在实际使用前进行更长时间跨度的验证。


六、工程化注意事项

6.1 美股数据使用边界

必须强调一个关键的技术事实:

数据类型 是否支持 说明
历史 K 线数据 ✅ 10 年级别,清洗对齐 适用于跨周期策略回测
实时 K 线 /kline/latest 端点 可用于美股实时监控
逐笔成交(trades) ❌ 不支持 美股和 A 股不提供 tick 级逐笔

对于美股的实时监控,建议使用 TickDB /kline/latest 端点轮询(建议频率不超过 1 次/秒,配合 3001 限频处理),而非尝试 WebSocket 订阅 trades 频道。

6.2 仓位管理的黄金法则

配对交易中,永远不要"All in"一对配对。建议单对配比不超过总资本的 10%,同时运行不超过 5 对配对(分散单个配对失效的风险)。

6.3 协整关系的动态维护

协整关系不是永久的。建议每个季度重新运行一次筛选流程,检查已有配对的协整 p-value 是否恶化。如果某对的 p-value 从 0.002 上升到 0.15,应当考虑剔除并寻找替代配对。


结语

统计套利是一门关于耐心的生意。

它不追逐涨停板,不预测美联储决议,不押注宏观叙事。它安静地在两只股票的价格缝隙中,寻找那个总会回归的均衡点。

但这份耐心需要工程上的严谨支撑。筛选流程中的每一步——从协整检验到 Z-Score 阈值——都值得反复推敲。回测不是为了证明策略"有效",而是为了理解策略"在什么条件下"有效,以及"在什么条件下"会失效。

如果你希望亲手运行本文的筛选流程并验证配对效果,可以使用 TickDB 的历史 K 线数据接口获取 10 年级别的清洗数据,进行完整的回测验证。


下一步行动

如果你想亲手实现配对筛选流程

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行

如果你需要机构级的历史全量数据(用于更严苛的协整验证和参数优化),联系 [email protected] 了解专业版数据方案。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可以让 AI 帮你生成配对筛选和数据获取的辅助代码。


本文不构成任何投资建议。市场有风险,投资需谨慎。