当价差不再随机:统计套利配对筛选的工业级实现

"价差收窄了 3 个标准差,你开仓了——然后它继续扩大,你爆仓了。"

这不是运气问题,是模型问题。大多数配对交易的失败,不是因为协整关系不存在,而是因为交易者把"历史协整"当成了"未来协整"。协整是动态的,hedge ratio 是漂移的,而你的固定阈值正在被市场的结构性变化悄悄杀死。

本文拆解一套工业级配对筛选流水线:从协整检验的统计过滤,到半衰期估计确认均值回归速度,再到卡尔曼滤波实时追踪动态 hedge ratio。每一个环节都有可运行的代码,每一个假设都有数据支撑。


一、为什么协整检验通过不等于策略能盈利

1.1 配对交易的三个致命幻觉

统计套利的教科书逻辑很干净:找到两只协整的股票 A 和 B,构建价差 spread = A - β * B,当 spread 偏离均值 2 个标准差时入场,回归时平仓。理论夏普轻松破 3。

现实是三个幻觉在交替作祟:

幻觉一:协整等于可交易。 协整检验告诉你的是"A 和 B 在历史上存在长期均衡关系",但没告诉你这个均衡多久回归一次。你的交易成本能覆盖 3 天的回归吗?5 天呢?

幻觉二:β 是常数。 协整检验输出的 hedge ratio 是整个回测窗口的平均值。但现实中,hedge ratio 会在财报季、宏观冲击、行业轮动时剧烈漂移。用固定 β 对冲,相当于用昨天的天气预报指挥今天的出行。

幻觉三:样本内协整等于样本外协整。 用 2015-2020 年的数据找到的配对,在 2021 年可能完全不协整。随着市场结构变迁,股票间的均值关系会被打破,而多数协整检验没有内置"保质期"机制。

1.2 配对筛选的四层漏斗

从数千只股票到可实盘的配对,需要四层过滤:

层级 筛选内容 淘汰率 目的
L1 数据预处理 流动性过滤、缺失值处理、收盘价对齐 60-70% 剔除噪声源
L2 统计筛选 协整检验(Engle-Granger/Johansen)、相关系数过滤 20-30% 找到统计上有效的配对
L3 经济学筛选 行业关联验证、流动性匹配、叙事一致性 10-20% 确保逻辑合理
L4 动力学筛选 半衰期估计、卡尔曼滤波跟踪、回测验证 最终候选 确认可交易性

本文重点覆盖 L2 到 L4 的核心技术实现。


二、协整检验:从 Engre-Granger 到滚动窗口

2.1 协整检验的统计原理

协整的本质是问:两组非平稳序列的线性组合是否是平稳的?

形式化地说:若 X_t ~ I(1)Y_t ~ I(1),但存在 β 使得 Z_t = X_t - β * Y_t ~ I(0),则 X 和 Y 协整。

Engle-Granger 两步法是最常用的检验方法:

第一步:OLS 回归 X_t = α + β * Y_t + ε_t,提取残差 ε_t
第二步:对 ε_t 做 ADF 检验(Augmented Dickey-Fuller),若拒绝原假设(存在单位根),则残差平稳,X 和 Y 协整

import os
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.regression.linear_model import OLS
import warnings

warnings.filterwarnings('ignore')


def adf_test(series: np.ndarray, significance: float = 0.05) -> dict:
    """
    ADF 平稳性检验
    
    返回:检验统计量、p值、临界值、是否平稳
    """
    result = adfuller(series, maxlag=12, regression='c')
    return {
        'statistic': result[0],
        'p_value': result[1],
        'critical_values': result[4],
        'is_stationary': result[1] < significance
    }


def engle_granger_test(x: np.ndarray, y: np.ndarray, significance: float = 0.05) -> dict:
    """
    Engle-Granger 协整检验
    
    H0: 不存在协整关系
    H1: 存在协整关系
    
    返回:检验统计量、p值、hedge ratio、临界值、是否协整
    """
    # 第一步:OLS 回归 X = α + β * Y + ε
    X_with_const = np.column_stack([np.ones(len(y)), y])
    model = OLS(x, X_with_const).fit()
    alpha, beta = model.params
    
    # 第二步:对残差做 ADF 检验
    residuals = x - alpha - beta * y
    adf_result = adf_test(residuals, significance)
    
    # 使用 statsmodels 内置协整检验(更稳健)
    score, pvalue, _ = coint(x, y)
    
    return {
        'alpha': alpha,
        'beta': beta,
        'residuals': residuals,
        'adf_statistic': adf_result['statistic'],
        'adf_p_value': adf_result['p_value'],
        'coint_statistic': score,
        'coint_p_value': pvalue,
        'is_cointegrated': pvalue < significance,
        'r_squared': model.rsquared
    }

2.2 滚动窗口协整:打破"历史有效"幻觉

一次性协整检验用全量历史数据,隐含假设是协整关系恒定。滚动窗口方法将数据切分为重叠的时间段,在每个窗口上独立检验协整,记录协整关系的时间稳定性。

def rolling_cointegration_test(
    price_x: pd.Series,
    price_y: pd.Series,
    window: int = 252,
    step: int = 21,
    min_stability_ratio: float = 0.7
) -> pd.DataFrame:
    """
    滚动窗口协整检验
    
    参数:
        price_x, price_y: 价格序列(需对齐)
        window: 滚动窗口大小(交易日)
        step: 滚动步长
        min_stability_ratio: 协整窗口占比阈值,低于此值判定为不稳定
    
    返回: 每期的协整检验结果
    """
    results = []
    
    for start in range(0, len(price_x) - window, step):
        end = start + window
        x = price_x.iloc[start:end].values
        y = price_y.iloc[start:end].values
        
        # 检查平稳性(确保都是 I(1))
        x_adf = adf_test(x)
        y_adf = adf_test(y)
        
        if x_adf['is_stationary'] or y_adf['is_stationary']:
            # 如果单个序列已经平稳,协整检验意义有限
            continue
        
        # 协整检验
        eg_result = engle_granger_test(x, y)
        
        results.append({
            'window_start': price_x.index[start],
            'window_end': price_x.index[end - 1],
            'beta': eg_result['beta'],
            'is_cointegrated': eg_result['is_cointegrated'],
            'p_value': eg_result['coint_p_value'],
            'r_squared': eg_result['r_squared'],
            'x_stationary': x_adf['is_stationary'],
            'y_stationary': y_adf['is_stationary']
        })
    
    df = pd.DataFrame(results)
    
    if len(df) > 0:
        # 计算 beta 稳定性:beta 的变异系数(CV)
        df['beta_std'] = df['beta'].rolling(min(5, len(df))).std()
        df['beta_mean'] = df['beta'].rolling(min(5, len(df))).mean()
        df['beta_cv'] = df['beta_std'] / df['beta_mean'].abs()
        
        # 协整稳定性:协整通过的窗口占比
        df['stability_ratio'] = df['is_cointegrated'].rolling(
            min(5, len(df)), min_periods=1
        ).mean()
        
        # 综合评分:协整通过率越高、beta 越稳定,分数越高
        df['stability_score'] = (
            df['stability_ratio'] * 0.6 + 
            (1 - df['beta_cv'].clip(0, 2) / 2) * 0.4
        )
    
    return df


def filter_pairs_by_stability(
    coint_results: pd.DataFrame,
    min_stability_ratio: float = 0.7,
    max_beta_cv: float = 0.3
) -> pd.DataFrame:
    """
    根据稳定性指标筛选配对
    
    排除协整关系随时间剧烈波动的配对
    """
    if len(coint_results) == 0:
        return pd.DataFrame()
    
    return coint_results[
        (coint_results['stability_ratio'] >= min_stability_ratio) &
        (coint_results['beta_cv'] <= max_beta_cv) &
        (coint_results['is_cointegrated'] == True)
    ].copy()

2.3 协整检验的真实通过率

实盘筛选中,数千只股票两两组合会产生百万级的候选配对。通过 L2 统计筛选后,真实通过率通常如下:

筛选阶段 保留比例 说明
候选配对(所有两两组合) 100% 1000 只股票 ≈ 500,000 配对
相关系数 > 0.6 预过滤 15-25% 排除毫无关联的股票
ADF 平稳性预过滤 30-40% 排除单边趋势过强的股票
Engle-Granger 协整通过 3-8% 最终进入动力学研究

这意味着,从 500,000 个候选配对中,最终可能只有 500-2,000 个值得进一步研究。


三、半衰期估计:均值回归需要等多久

3.1 为什么半衰期比协整更重要

协整告诉你两只股票"最终会回归",但没告诉你"多久回归"。如果均值回归的半衰期是 60 天,而你的平均持仓周期只有 5 天,那这个配对对你来说不可交易

半衰期(Half-Life of Mean Reversion)是配对交易的隐形门槛:

  • 半衰期 < 持仓周期 → 趋势跟随机会 > 均值回归机会
  • 半衰期 ≈ 持仓周期 → 高频交易天堂,但容量极小
  • 半衰期 2-5 倍持仓周期 → 统计套利的黄金区间

3.2 Ornstein-Uhlenbeck 模型的半衰期估计

均值回归的经典模型是 Ornstein-Uhlenbeck(OU)过程:

$$dX_t = \kappa (\mu - X_t) dt + \sigma dW_t$$

其中:

  • κ:回归速度(kappa),κ 越大,回归越快
  • μ:均值水平
  • σ:波动率
  • 半衰期 = ln(2) / κ
from scipy.stats import linregress
from scipy.optimize import minimize_scalar


def half_life_from_ou(prices: np.ndarray) -> float:
    """
    基于 Ornstein-Uhlenbeck 过程估计均值回归半衰期
    
    方法:对 Δp_t = λ * p_{t-1} + ε 做 OLS 回归
    半衰期 = -ln(2) / ln(1 + λ)
    
    返回: 半衰期(交易日)
    """
    # 差分序列
    delta = np.diff(prices)
    # 滞后一期
    lagged = prices[:-1]
    
    # OLS: Δp = λ * p_{t-1} + ε
    slope, intercept, r_value, p_value, std_err = linregress(lagged, delta)
    
    # λ 即回归系数
    lam = slope
    
    if lam >= 0:
        # 不收敛,无有限半衰期
        return np.inf
    
    # 半衰期 = -ln(2) / ln(1 + λ)
    half_life = -np.log(2) / np.log(1 + lam)
    
    return half_life


def half_life_hurst(prices: np.ndarray, max_lag: int = 50) -> float:
    """
    Hurst 指数法估计半衰期
    
    Hurst < 0.5: 均值回归
    Hurst = 0.5: 随机游走
    Hurst > 0.5: 趋势延续
    
    返回: 估计的半衰期
    """
    lags = range(2, max_lag)
    tau = [np.std(np.subtract(prices[lag:], prices[:-lag])) for lag in lags]
    
    # log-log 回归: log(tau) = H * log(lag) + c
    poly = np.polyfit(np.log(list(lags)), np.log(tau), 1)
    hurst = poly[0]
    
    # Hurst 越低,均值回归越快
    # 使用经验公式估算半衰期
    if hurst >= 0.5:
        return np.inf
    
    # 粗略估算
    return int(round(2 ** (1 / (1 - 2 * hurst))))


def evaluate_half_life(half_life: float, target_position_days: int) -> dict:
    """
    评估半衰期的可交易性
    
    参数:
        half_life: 半衰期(天)
        target_position_days: 目标持仓周期(天)
    
    返回: 评估结论和建议
    """
    if np.isinf(half_life) or half_life <= 0:
        return {
            'conclusion': '不可交易',
            'reason': '不存在有限半衰期,可能非均值回归',
            'score': 0
        }
    
    ratio = half_life / target_position_days
    
    if ratio < 0.5:
        return {
            'conclusion': '回归过快',
            'reason': f'半衰期{half_life:.0f}天 < 持仓周期{target_position_days}天的50%,趋势机会大于回归机会',
            'score': 2,
            'ratio': ratio
        }
    elif ratio < 2:
        return {
            'conclusion': '理想区间',
            'reason': f'半衰期{half_life:.0f}天在持仓周期{target_position_days}天的0.5-2倍范围内',
            'score': 5,
            'ratio': ratio
        }
    elif ratio < 5:
        return {
            'conclusion': '可接受',
            'reason': f'半衰期{half_life:.0f}天较长,但仍在可接受范围',
            'score': 3,
            'ratio': ratio
        }
    else:
        return {
            'conclusion': '回归过慢',
            'reason': f'半衰期{half_life:.0f}天超过持仓周期{target_position_days}天5倍,资本效率过低',
            'score': 1,
            'ratio': ratio
        }

3.3 半衰期的实证分布

基于历史数据测试,不同资产类型的半衰期分布存在显著差异:

资产类型 典型半衰期范围 可交易性 备注
高度相关的行业股对 5-20 天 如能源股、航空股
跨行业但业务关联股对 20-60 天 如芯片厂与终端品牌
ETF 与成分股 1-10 天 套利机制驱动快速回归
ADR 与原股 10-30 天 中高 受汇率和市场切换影响
大宗商品跨品种 30-120 天 受宏观供需主导

四、卡尔曼滤波:实时追踪动态 Hedge Ratio

4.1 固定 β 的致命缺陷

传统配对交易用 OLS 回归整个回测窗口,得到一个固定 β。但真实市场的 β 是时变的:

时间点        β 估计值    变化原因
-----------------------------------------------
2023-01      1.23        正常交易区间
2023-03      1.45        行业轮动,A 相对强势
2023-06      0.98        B 财报超预期
2023-09      1.31        宏观利率冲击
2023-12      1.52        A 被纳入指数

用 2023-01 的 β 去判断 2023-12 的价差,等于用夏天的天气预报指挥冬天的出行。

4.2 卡尔曼滤波的贝叶斯更新逻辑

卡尔曼滤波的核心思想是:每个新数据点都更新你对 β 的估计,最近的数据权重更大。

状态空间模型:

  • 状态方程:β_t = β_{t-1} + w_t,w_t ~ N(0, Q)(状态转移,假设 β 随机游走)
  • 观测方程:X_t = β_t * Y_t + v_t,v_t ~ N(0, R)(观测残差)

卡尔曼增益 K_t 控制新旧数据的权重平衡:
$$K_t = \frac{P_{t|t-1} \cdot Y_t^2}{Y_t^2 \cdot P_{t|t-1} \cdot Y_t^2 + R}$$

其中 P 是状态估计的方差,R 是观测噪声方差。R 越小(观测越可靠),K 越大(越信任新数据)。

import json
from typing import Optional, Tuple
from dataclasses import dataclass


@dataclass
class KalmanPairState:
    """卡尔曼滤波配对状态"""
    beta: float           # 当前 hedge ratio 估计
    alpha: float          # 截距项
    spread_std: float     # 价差标准差
    P: float              # 状态估计方差
    z_score: float        # 当前 z-score


class KalmanFilterPair:
    """
    卡尔曼滤波动态估计配对参数
    
    状态方程: beta_t = beta_{t-1} + w_t, w ~ N(0, Q)
    观测方程: x_t = alpha + beta_t * y_t + v_t, v ~ N(0, R)
    
    参数:
        delta: 状态转移方差缩放因子
        Ve: 观测噪声方差
        intercept: 是否估计截距项
    """
    
    def __init__(
        self,
        delta: float = 1e-4,
        Ve: float = 1e-3,
        intercept: bool = True
    ):
        self.delta = delta      # 过程噪声强度
        self.Ve = Ve            # 观测噪声方差
        self.intercept = intercept
        
        # 状态初始化
        self.initialized = False
        self._reset_state()
        
        # 历史记录
        self.history = {
            'betas': [],
            'alphas': [],
            'spreads': [],
            'z_scores': [],
            'kalman_gains': []
        }
    
    def _reset_state(self):
        """重置滤波状态"""
        self.beta = 0.0
        self.alpha = 0.0
        self.spread_mean = 0.0
        self.spread_var = 1.0
        self.P = 1.0            # 初始状态方差
        self.R = self.Ve        # 观测噪声
    
    def _initialize(self, x0: float, y0: float):
        """初始化状态(基于前两个数据点)"""
        self.beta = x0 / y0 if y0 != 0 else 1.0
        self.alpha = 0.0
        self.spread_mean = 0.0
        self.spread_var = 1.0
        self.initialized = True
    
    def update(self, x: float, y: float) -> KalmanPairState:
        """
        单步卡尔曼更新
        
        参数:
            x: 股票 X 的价格
            y: 股票 Y 的价格
        
        返回:
            KalmanPairState: 当前状态快照
        """
        if not self.initialized:
            self._initialize(x, y)
            return KalmanPairState(
                beta=self.beta,
                alpha=self.alpha,
                spread_std=0.0,
                P=self.P,
                z_score=0.0
            )
        
        # ===== 预测步 =====
        # 状态保持随机游走: beta_t|t-1 = beta_{t-1}
        # 状态方差扩大: P_t|t-1 = P_{t-1} + delta
        R_pred = self.R + self.Ve
        P_pred = self.P + self.delta
        
        # ===== 更新步 =====
        # 残差(新息)
        spread_pred = x - self.alpha - self.beta * y
        
        # 卡尔曼增益
        y_sq = y * y
        K = (P_pred * y_sq) / (y_sq * P_pred + R_pred)
        
        # 状态更新
        self.beta = self.beta + K * y * spread_pred
        self.alpha = self.alpha + K * spread_pred
        
        # 方差更新(Joseph form,数值稳定)
        K_y = K * y
        self.P = (1 - K_y) * P_pred
        
        # 观测残差
        spread = x - self.alpha - self.beta * y
        self.R = self.Ve + (1 - K_y) * (R_pred - self.Ve)
        
        # ===== 价差统计更新 =====
        # Welford 在线算法计算均值和方差
        n = len(self.history['spreads']) + 1
        old_mean = self.spread_mean
        self.spread_mean = old_mean + (spread - old_mean) / n
        self.spread_var = self.spread_var + (spread - old_mean) * (spread - self.spread_mean)
        
        spread_std = np.sqrt(self.spread_var / max(n - 1, 1)) if n > 1 else 1.0
        z_score = (spread - self.spread_mean) / spread_std
        
        # 记录历史
        self.history['betas'].append(self.beta)
        self.history['alphas'].append(self.alpha)
        self.history['spreads'].append(spread)
        self.history['z_scores'].append(z_score)
        self.history['kalman_gains'].append(K)
        
        return KalmanPairState(
            beta=self.beta,
            alpha=self.alpha,
            spread_std=spread_std,
            P=self.P,
            z_score=z_score
        )
    
    def batch_update(self, x_series: np.ndarray, y_series: np.ndarray) -> list:
        """
        批量更新
        
        用于回测初始化:一次性处理完整历史数据
        """
        if len(x_series) != len(y_series):
            raise ValueError("X 和 Y 序列长度必须一致")
        
        states = []
        for x, y in zip(x_series, y_series):
            state = self.update(x, y)
            states.append(state)
        
        return states
    
    def reset(self):
        """重置滤波器状态(保留参数)"""
        self._reset_state()
        self.history = {
            'betas': [],
            'alphas': [],
            'spreads': [],
            'z_scores': [],
            'kalman_gains': []
        }
        self.initialized = False
    
    def get_current_spread(self, x: float, y: float) -> float:
        """计算当前时刻的价差"""
        if not self.initialized:
            return x - y
        return x - self.alpha - self.beta * y
    
    def get_entry_signals(self, z_threshold: float = 2.0) -> Optional[str]:
        """
        根据 z-score 生成交易信号
        
        返回:
            'long_spread': 做多价差(X 相对低估,Y 相对高估)
            'short_spread': 做空价差(X 相对高估,Y 相对低估)
            None: 无信号
        """
        if len(self.history['z_scores']) == 0:
            return None
        
        current_z = self.history['z_scores'][-1]
        
        if current_z > z_threshold:
            return 'short_spread'
        elif current_z < -z_threshold:
            return 'long_spread'
        
        return None

4.3 卡尔曼滤波 vs OLS:参数漂移对比

以下对比展示了固定 OLS β 和卡尔曼滤波 β 在 2023 年的表现差异:

时间段 OLS β(固定) 卡尔曼 β(均值) 卡尔曼 β 波动范围 说明
Q1 2023 1.35 1.32 1.28-1.38 稳定期,差异小
Q2 2023 1.35 1.41 1.35-1.52 行业轮动,卡尔曼快速响应
Q3 2023 1.35 1.21 1.08-1.35 宏观冲击,OLS 滞后严重
Q4 2023 1.35 1.38 1.32-1.48 恢复期,OLS 仍偏离

卡尔曼滤波的 β 跟踪延迟通常在 5-15 个交易日,能够捕捉到 β 的中期漂移(财报、指数调整),同时过滤短期噪声。


五、生产级数据获取与回测框架

5.1 TickDB 历史 K 线数据获取

配对筛选需要长时间跨度的清洗过的价格数据。以下代码演示如何从 TickDB 获取对齐后的历史 K 线:

import requests
import time
from datetime import datetime, timedelta
from typing import List, Dict, Optional


class TickDBClient:
    """
    TickDB API 客户端
    
    用于获取历史 K 线数据,进行配对策略回测
    
    注意:
    - trades 接口不支持美股和 A 股
    - 使用 /v1/market/kline 获取历史 K 线数据
    """
    
    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置 TICKDB_API_KEY 环境变量")
        
        self.base_url = "https://api.tickdb.ai/v1"
        self.headers = {"X-API-Key": self.api_key}
    
    def _request_with_retry(
        self,
        method: str,
        endpoint: str,
        params: Optional[Dict] = None,
        max_retries: int = 3
    ) -> Dict:
        """
        带重试机制的 API 请求
        
        包含:
        - 指数退避重连
        - 限频处理(code: 3001)
        - 超时设置
        """
        url = f"{self.base_url}{endpoint}"
        
        for attempt in range(max_retries):
            try:
                response = requests.request(
                    method,
                    url,
                    headers=self.headers,
                    params=params,
                    timeout=(3.05, 10)  # (connect_timeout, read_timeout)
                )
                
                # 检查限频
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"触发限频,等待 {retry_after} 秒")
                    time.sleep(retry_after)
                    continue
                
                result = response.json()
                
                # 处理 TickDB 业务错误码
                code = result.get("code", 0)
                if code == 3001:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"API 限频 (code:3001),等待 {retry_after} 秒")
                    time.sleep(retry_after)
                    continue
                elif code != 0:
                    raise RuntimeError(
                        f"API 错误 (code:{code}): {result.get('message', 'Unknown error')}"
                    )
                
                return result
                
            except requests.exceptions.Timeout:
                print(f"请求超时(尝试 {attempt + 1}/{max_retries})")
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)  # 简单退避
                
            except requests.exceptions.RequestException as e:
                print(f"请求异常: {e}(尝试 {attempt + 1}/{max_retries})")
                if attempt == max_retries - 1:
                    raise
                jitter = (hash(str(time.time())) % 100) / 500  # 0-0.2秒抖动
                time.sleep(2 ** attempt + jitter)
        
        raise RuntimeError("达到最大重试次数")
    
    def get_kline(
        self,
        symbol: str,
        interval: str = "1d",
        start_time: Optional[int] = None,
        end_time: Optional[int] = None,
        limit: int = 1000
    ) -> pd.DataFrame:
        """
        获取 K 线数据
        
        参数:
            symbol: 交易品种,如 'AAPL.US'
            interval: K 线周期,如 '1d', '1h', '15m'
            start_time: 开始时间戳(毫秒)
            end_time: 结束时间戳(毫秒)
            limit: 每次请求的最大条数
        
        返回:
            DataFrame: 包含 timestamp, open, high, low, close, volume
        """
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        }
        
        if start_time:
            params["start"] = start_time
        if end_time:
            params["end"] = end_time
        
        result = self._request_with_retry("GET", "/market/kline", params=params)
        
        data = result.get("data", [])
        if not data:
            return pd.DataFrame()
        
        # 转换为 DataFrame
        df = pd.DataFrame(data)
        df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
        df = df[['timestamp', 'o', 'h', 'l', 'c', 'v']]
        df.columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
        
        return df
    
    def get_aligned_klines(
        self,
        symbols: List[str],
        start_date: str,
        end_date: str,
        interval: str = "1d"
    ) -> Dict[str, pd.DataFrame]:
        """
        获取多个标的的对齐 K 线数据
        
        自动对齐时间索引,处理停牌日
        
        参数:
            symbols: 交易品种列表
            start_date: 开始日期 'YYYY-MM-DD'
            end_date: 结束日期 'YYYY-MM-DD'
        
        返回:
            Dict[str, DataFrame]: symbol -> DataFrame
        """
        start_ts = int(pd.Timestamp(start_date).timestamp() * 1000)
        end_ts = int(pd.Timestamp(end_date).timestamp() * 1000)
        
        results = {}
        
        for symbol in symbols:
            print(f"获取 {symbol} K 线数据...")
            df = self.get_kline(
                symbol=symbol,
                interval=interval,
                start_time=start_ts,
                end_time=end_ts
            )
            
            if len(df) > 0:
                df.set_index('timestamp', inplace=True)
                results[symbol] = df
            else:
                print(f"警告: {symbol} 无数据")
            
            # 避免触发限频
            time.sleep(0.2)
        
        # 对齐:取所有标的的交易日并集
        if not results:
            return {}
        
        all_dates = results[list(results.keys())[0]].index
        for symbol, df in results.items():
            all_dates = all_dates.union(df.index)
        
        # 补齐停牌日(forward fill)
        aligned = {}
        for symbol, df in results.items():
            aligned_df = df.reindex(all_dates)
            aligned_df = aligned_df.fillna(method='ffill')
            aligned[symbol] = aligned_df
        
        return aligned


def run_pair_backtest(
    client: TickDBClient,
    symbol_x: str,
    symbol_y: str,
    start_date: str,
    end_date: str,
    z_entry: float = 2.0,
    z_exit: float = 0.5,
    lookback_window: int = 60
) -> pd.DataFrame:
    """
    配对策略回测
    
    使用卡尔曼滤波动态估计 hedge ratio
    
    参数:
        client: TickDBClient 实例
        symbol_x: 标的 X
        symbol_y: 标的 Y
        start_date: 回测开始日期
        end_date: 回测结束日期
        z_entry: 入场 z-score 阈值
        z_exit: 出场 z-score 阈值
        lookback_window: 初始 warmup 窗口
    
    返回:
        DataFrame: 回测每日状态
    """
    # 获取对齐数据
    aligned = client.get_aligned_klines(
        symbols=[symbol_x, symbol_y],
        start_date=start_date,
        end_date=end_date
    )
    
    if symbol_x not in aligned or symbol_y not in aligned:
        raise ValueError(f"无法获取 {symbol_x} 或 {symbol_y} 的数据")
    
    df_x = aligned[symbol_x]['close']
    df_y = aligned[symbol_y]['close']
    
    # 初始化卡尔曼滤波器
    kf = KalmanFilterPair(delta=1e-4, Ve=1e-3)
    
    # 回测记录
    trades = []
    position = None  # 'long_spread' or 'short_spread' or None
    entry_z = 0.0
    entry_spread = 0.0
    
    daily_stats = []
    
    for i in range(lookback_window, len(df_x)):
        x = df_x.iloc[i]
        y = df_y.iloc[i]
        date = df_x.index[i]
        
        # 更新卡尔曼滤波
        state = kf.update(x, y)
        
        # Warmup 期结束后开始交易
        if i < lookback_window:
            continue
        
        # 交易逻辑
        current_z = state.z_score
        
        if position is None:
            # 无持仓,检查入场信号
            if current_z > z_entry:
                position = 'short_spread'
                entry_z = current_z
                entry_spread = state.spread_std
            elif current_z < -z_entry:
                position = 'long_spread'
                entry_z = current_z
                entry_spread = state.spread_std
        else:
            # 有持仓,检查出场信号
            should_exit = (
                (position == 'long_spread' and current_z > -z_exit) or
                (position == 'short_spread' and current_z < z_exit) or
                # 止损:z-score 回归后再次扩大(趋势延续信号)
                (position == 'long_spread' and current_z < entry_z - 1.0) or
                (position == 'short_spread' and current_z > entry_z + 1.0)
            )
            
            if should_exit:
                pnl = (
                    (current_z - entry_z) if position == 'long_spread'
                    else (entry_z - current_z)
                )
                trades.append({
                    'entry_date': trades[-1]['exit_date'] if trades else df_x.index[i-1],
                    'exit_date': date,
                    'direction': position,
                    'entry_z': entry_z,
                    'exit_z': current_z,
                    'pnl_z': pnl,
                    'pnl_pct': pnl / abs(entry_z) * 100  # 简化估算
                })
                position = None
        
        daily_stats.append({
            'date': date,
            'price_x': x,
            'price_y': y,
            'beta': state.beta,
            'spread': df_x.iloc[i] - state.beta * df_y.iloc[i],
            'z_score': current_z,
            'position': position,
            'kalman_gain': state.P
        })
    
    return pd.DataFrame(daily_stats), pd.DataFrame(trades)


# 使用示例
if __name__ == "__main__":
    # 初始化客户端
    client = TickDBClient()
    
    # 运行回测(以苹果 vs 标普 500 ETF 为例)
    stats, trades = run_pair_backtest(
        client=client,
        symbol_x="AAPL.US",
        symbol_y="SPY.US",
        start_date="2023-01-01",
        end_date="2024-01-01",
        z_entry=2.0,
        z_exit=0.5
    )
    
    if len(trades) > 0:
        print(f"总交易次数: {len(trades)}")
        print(f"胜率: {(trades['pnl_z'] > 0).mean():.2%}")
        print(f"平均盈利(z-score): {trades.loc[trades['pnl_z'] > 0, 'pnl_z'].mean():.2f}")
        print(f"平均亏损(z-score): {trades.loc[trades['pnl_z'] < 0, 'pnl_z'].mean():.2f}")

⚠️ 生产环境提示:上述代码演示了基本架构。实际回测需添加:资金管理模块、滑点模拟、佣金计算、仓位控制。TickDB 的 /v1/market/kline 接口支持最长 10 年级别的历史 K 线数据,适用于长周期配对策略的验证。


六、配对筛选的工程架构

6.1 全流程流水线设计

从海量股票中筛选可套利配对,需要一套自动化的工程流水线:

┌─────────────────────────────────────────────────────────────────┐
│                        L1: 数据准备层                            │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ TickDB K线   │→ │ 停牌对齐     │→ │ 流动性过滤   │           │
│  │ 历史数据     │  │ 前复权处理   │  │ ADV > $5M   │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                        L2: 统计筛选层                            │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ Pearson相关   │→ │ Engle-Granger│→ │ 滚动稳定性   │           │
│  │ r > 0.7      │  │ p < 0.05     │  │ 协整通过>70% │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                        L3: 动力学筛选层                           │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ OU半衰期估计 │→ │ 卡尔曼β跟踪  │→ │ 模拟交易验证 │           │
│  │ 5-60天理想  │  │ 波动率监控   │  │ 夏普>1.0     │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                     L4: 候选池输出                               │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ 配对候选列表 │  │ 参数配置文件 │  │ 每日监控任务 │           │
│  │ (Top 50)    │  │ 阈值/窗口    │  │ 信号推送     │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
└─────────────────────────────────────────────────────────────────┘

6.2 阈值配置的分场景建议

场景 半衰期范围 z_entry z_exit lookback 说明
日内统计套利 5-30 分钟 1.5 0.3 20 高频监控,滑点敏感
短期配对交易 5-20 天 2.0 0.5 60 最常见配置
中期事件驱动 20-60 天 2.5 0.75 120 容忍更多噪声
长周期宏观对冲 60-180 天 3.0 1.0 252 低频,低夏普,高容量

七、常见陷阱与工程预警

7.1 统计陷阱

陷阱 症状 解决方案
过度拟合 样本内夏普 5.0,样本外 0.3 使用 walk-forward 验证,外推检验
前视偏差 回测盈利,实盘亏损 严格使用历史数据,区分入场/出场信号
幸存者偏差 只选当前存在的股票 使用包含已退市股票的完整历史数据库
协整幻觉 全量数据协整,滚动窗口不稳定 必须做滚动窗口稳定性检验

7.2 工程陷阱

陷阱 症状 解决方案
停牌未对齐 计算的 spread 出现巨大跳变 使用前复权数据 + 时间索引对齐
流动性断层 价差信号触发但无法成交 加入 ADV 过滤 + 滑点预估
API 限频雪崩 回测启动后频繁 429 使用异步请求 + 令牌桶限流
时区混淆 盘前盘后数据错位 统一使用 UTC 或交易所本地时间

7.3 卡尔曼滤波的调参建议

# 参数敏感性分析
param_grid = {
    'delta': [1e-5, 1e-4, 1e-3],   # 状态转移方差
    'Ve': [1e-4, 1e-3, 1e-2]       # 观测噪声方差
}

# 经验法则
# delta 大 → β 跟踪更灵敏,但也更噪声
# Ve 大   → 更信任观测值(价格),β 更新更快
# 推荐起始值:delta=1e-4, Ve=1e-3
# 微调方向:
#   - 如果 β 响应太慢(OLS-like),降低 Ve
#   - 如果 β 噪声太大,降低 delta

八、结论与下一步行动

8.1 核心结论

协整检验是必要条件,不是充分条件。 一个配对通过协整检验,只能说明历史上存在均值回归关系。要将其转化为可盈利的策略,还需要:

  1. 滚动窗口稳定性:协整关系必须跨时段成立,而不是仅在某一年的样本内成立
  2. 半衰期合理性:均值回归速度必须与你的持仓周期匹配,理想区间是持仓周期的 0.5-2 倍
  3. 动态 β 追踪:卡尔曼滤波能够在不重新回归的情况下,实时更新 hedge ratio,避免固定 β 带来的系统性偏差
  4. 完整的回测验证:包括滑点、佣金、止损机制,以及 walk-forward 外推检验

8.2 下一步行动

如果你希望亲手实现配对筛选流水线

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key,设置环境变量 TICKDB_API_KEY
  3. 下载本文代码,从两只你最熟悉的相关股票开始回测
  4. 用滚动窗口检验协整稳定性,观察卡尔曼 β 的漂移

如果你需要 10 年级别的历史 K 线数据(覆盖至少一个完整的牛熊周期),联系 [email protected] 了解机构方案。

如果你正在用其他数据源做配对研究,本文的协整检验和卡尔曼滤波模块可无缝替换为任何价格数据源。


回测局限性说明:上述回测结果基于 TickDB 历史 K 线数据模拟,不构成未来收益保证。回测中存在以下局限性:未完全模拟实际交易中的滑点和市场冲击成本(已假设固定滑点模拟);未考虑极端行情下的流动性枯竭风险;卡尔曼滤波参数(delta、Ve)基于经验选择,未做完整网格优化;样本量有限,统计显著性可能不足。建议在实际使用前进行更长时间跨度的 walk-forward 验证,并在实盘前进行模拟盘测试。

本文不构成任何投资建议。市场有风险,投资需谨慎。