市场在波动,但价差总会回归

凌晨三点,你的监控系统尖叫了一声。

不是暴跌,不是熔断,是两只看起来毫无关系的股票——它们之间的价差突然突破了历史均值三个标准差。你盯着屏幕,手悬在鼠标上方,脑子飞速运转:这是假信号还是真的套利机会?这个价差会回归吗?回归的速度够快吗?

大多数量化新人会在这一步做出错误的判断:要么因为数据挖掘偏误过度自信,要么因为缺乏系统方法而错过真正的机会。而成熟的统计套利者,知道问题的答案藏在三个关键指标里:协整关系决定了价差会不会回归,半衰期决定了回归需要多久,卡尔曼滤波动态对冲比率决定了在回归路径上你能抓住多少。

本文从一个问题出发:面对全市场数千只股票,如何系统性地筛选出值得关注的配对,并构建一个可持续运行的统计套利框架。


一、为什么选股式筛选行不通

在讨论方法之前,先理解为什么很多量化新手的方法从根上就错了。

最常见的思路是:先选一只"核心股",然后遍历其他股票找相关性最高的。这在数学上可行,但在统计上埋了大雷——相关性是双尾的,两只股票可能高度负相关,但你不知道这种负相关什么时候会断裂。更关键的是,高相关不意味着协整,而协整才是统计套利的核心前提。

你真正需要的不是一个选股系统,而是一个配对筛选系统。核心区别在于:

维度 选股式思维 配对筛选思维
处理单位 单只股票 两只股票的价差序列
核心指标 相关性、β值 协整统计量、半衰期
稳定性假设 股票特性稳定 价差的均值回复特性稳定
候选规模 O(n) O(n²) — 这是真正的瓶颈

从 3000 只股票中选配对,数学上是 C(3000, 2) = 4,498,500 对。这个量级的全量检验在计算上不现实,你需要一套多阶段漏斗筛选方法。


二、理论框架:三块基石

2.1 协整:不是相关,而是共同驱动

相关描述的是两个变量朝同一方向移动的倾向,协整描述的是两个变量围绕一个共同趋势保持稳定关系的能力。

用更直观的话说:两只股票可能都在涨,但涨的速度不一样。相关性捕捉不到这种分化,协整却能捕捉到一个关键问题——当这种分化超过某个阈值后,它们会不会回到原来的相对位置?

形式上,如果两个 I(1) 序列 X_t 和 Y_t 的线性组合 Z_t = X_t - β·Y_t 是 I(0)(平稳的),则 X 和 Y 是协整的,β 称为协整向量

工程预警:协整检验需要足够长的历史数据(通常至少 120-250 个观测点),且要求数据频率一致。在使用 TickDB 获取数据时,务必检查数据连续性——任何停牌日缺失都会严重影响 ADF 检验结果。

2.2 半衰期:均值回复有多快

即使两个序列协整,也不意味着套利机会有经济价值。半衰期(half-life)衡量的是价差偏离均值后,回归一半所需的时间。

从 Ornstein-Uhlenbeck 过程出发:

$$dy_t = \lambda(\mu - y_{t-1})dt + dW_t$$

半衰期的估计公式为:

$$T_{half} = \frac{\ln(2)}{\hat{\lambda}}$$

其中 λ 从自回归回归中估计:

$$\Delta y_t = \lambda y_{t-1} + \epsilon_t$$

实践中的判断标准:

  • 半衰期 < 5 分钟:均值回复极快,高频策略专属
  • 半衰期 5 分钟 ~ 1 天:日内~短期套利,适合日内交易
  • 半衰期 1 ~ 30 天:中期套利,需要隔夜持仓能力
  • 半衰期 > 60 天:不适合统计套利,更接近趋势跟踪范畴

2.3 卡尔曼滤波:动态的才是真实的

传统配对交易用固定窗口的 OLS 计算对冲比率(hedge ratio)。但市场的产业结构在变化——公司并购、新产品发布、宏观冲击都会改变两只股票之间的关系。静态 hedge ratio 在结构性变化面前会失效

卡尔曼滤波通过状态空间模型解决了这个问题:

  • 状态方程:$\beta_t = \beta_{t-1} + \eta_t$(假设 hedge ratio 服从随机游走)
  • 观测方程:$y_t = \alpha + \beta_t x_t + \epsilon_t$

每来一个新的价格观测,滤波器立即更新对 hedge ratio 的估计。优势在于:

  • 不需要存储完整历史窗口
  • 自然加权近期数据
  • 能检测到结构性断点(通过估计误差方差的突变)

三、系统架构:四阶段筛选漏斗

阶段1: 候选池构建
━━━━━━━━━━━━━━━━━━
2500只股票
↓
按GICS行业分组 → 每组内部配对(行业内天然存在产业链关联)
↓
候选对数量: ~50,000

阶段2: 相关性预筛
━━━━━━━━━━━━━━━━━━
候选对 → 计算过去60天价格相关性
↓
阈值: |ρ| > 0.3 AND |ρ| < 0.95
↓
通过数量: ~8,000

阶段3: 协整检验
━━━━━━━━━━━━━━━━━━
通过候选 → Engle-Granger两步法
↓
ADF检验 p-value < 0.05
↓
通过数量: ~200

阶段4: 质量评估
━━━━━━━━━━━━━━━━━━
200对 → 半衰期过滤 (1天 < T_half < 30天)
         + 价差波动率 > 0.5σ
↓
最终配对池: ~15-40对

这个漏斗的设计逻辑是:用计算成本低的指标做粗筛,用统计意义最强的指标做精筛。协整检验是 O(n²) 计算中最贵的步骤,所以相关性预筛必须足够有效,才能在有限时间内完成全市场扫描。


四、生产级代码:数据获取与协整检验

4.1 数据获取层

import os
import time
import json
import random
import warnings
import numpy as np
import pandas as pd
import requests
from datetime import datetime, timedelta
from itertools import combinations

warnings.filterwarnings('ignore')

class TickDBClient:
    """TickDB REST API 客户端 - 带重连和限频处理"""

    def __init__(self, api_key=None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置 TICKDB_API_KEY 环境变量")
        self.base_url = "https://api.tickdb.ai/v1"
        self.headers = {"X-API-Key": self.api_key}
        self._rate_limit_backoff = 5  # 秒

    def _request(self, method, endpoint, params=None, max_retries=3):
        """带心跳检测和指数退避的请求封装"""
        url = f"{self.base_url}{endpoint}"
        for attempt in range(max_retries):
            try:
                response = requests.request(
                    method, url,
                    headers=self.headers,
                    params=params,
                    timeout=(3.05, 10)  # (connect_timeout, read_timeout)
                )

                # 限频处理
                if response.status_code == 429 or \
                   (isinstance(response.json(), dict) and
                    response.json().get("code") == 3001):
                    retry_after = int(response.headers.get("Retry-After", self._rate_limit_backoff))
                    print(f"  ⚠️ 限频触发,{retry_after}秒后重试...")
                    time.sleep(retry_after)
                    self._rate_limit_backoff = min(self._rate_limit_backoff * 1.5, 60)
                    continue

                response.raise_for_status()
                return response.json()

            except requests.exceptions.Timeout:
                print(f"  ⏱️ 请求超时,第{attempt+1}次重试")
                time.sleep(2 ** attempt + random.uniform(0, 1))
            except requests.exceptions.RequestException as e:
                print(f"  ❌ 请求失败: {e}")
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt + random.uniform(0, 1))

        return None

    def get_kline(self, symbol, interval="1d", limit=500):
        """获取历史K线数据"""
        result = self._request(
            "GET", "/market/kline",
            params={"symbol": symbol, "interval": interval, "limit": limit}
        )
        if result and result.get("code") == 0:
            return result.get("data", [])
        return []

    def get_available_symbols(self, market="US"):
        """获取可用交易品种列表"""
        result = self._request(
            "GET", "/symbols/available",
            params={"market": market}
        )
        if result and result.get("code") == 0:
            return [s["symbol"] for s in result.get("data", [])]
        return []


def fetch_pair_data(client, symbol_a, symbol_b, days=252, interval="1d"):
    """
    获取一对股票的历史K线数据并对齐
    ⚠️ 关键:停牌日会导致错位,必须显式处理
    """
    now = datetime.now()
    since = (now - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ")

    raw_a = client.get_kline(symbol_a, interval=interval, limit=days + 50)
    raw_b = client.get_kline(symbol_b, interval=interval, limit=days + 50)

    if not raw_a or not raw_b:
        return None

    # 转换为 DataFrame 并对齐
    df_a = pd.DataFrame(raw_a)[["ts", "c"]].rename(columns={"c": symbol_a})
    df_b = pd.DataFrame(raw_b)[["ts", "c"]].rename(columns={"c": symbol_b})

    df_a["ts"] = pd.to_datetime(df_a["ts"], unit="ms")
    df_b["ts"] = pd.to_datetime(df_b["ts"], unit="ms")

    # 内连接对齐(自动去除停牌日不重叠的记录)
    merged = pd.merge(df_a, df_b, on="ts", how="inner")
    merged = merged.sort_values("ts").dropna()

    # ⚠️ 数据质量检查:缺失超过5%则丢弃
    expected_len = min(len(raw_a), len(raw_b))
    if len(merged) < expected_len * 0.95:
        print(f"  ⚠️ {symbol_a}/{symbol_b} 数据缺失率高,已丢弃")
        return None

    return merged.set_index("ts")


# ── 使用示例 ──
if __name__ == "__main__":
    try:
        client = TickDBClient()
        symbols = client.get_available_symbols(market="US")
        print(f"获取到 {len(symbols)} 只US股票")
    except Exception as e:
        print(f"初始化失败: {e}")

⚠️ 生产环境提示:全量遍历数千只股票需要数小时,建议使用多进程并行(multiprocessing.Pool),但要注意 TickDB 的 API 限频(code:3001)。可在进程间共享一个令牌桶限速器,确保总 QPS 不超过限制。

4.2 协整检验实现

from statsmodels.tsa.stattools import adfuller, coint

def calculate_half_life(spread):
    """
    计算价差的均值回复半衰期
    OU过程: Δy_t = λ·y_{t-1} + ε_t
    半衰期 = ln(2) / |λ|
    """
    spread_lag = spread.shift(1).dropna()
    delta = spread.diff().dropna()

    # 去掉NaN后对齐
    valid_idx = spread_lag.notna() & delta.notna()
    y = delta[valid_idx].values
    x = spread_lag[valid_idx].values

    if len(x) < 20:
        return np.nan

    # OLS: Δy = λ·lag + const
    x_with_const = np.column_stack([np.ones(len(x)), x])
    try:
        coeffs = np.linalg.lstsq(x_with_const, y, rcond=None)[0]
        lambda_hat = coeffs[1]

        if lambda_hat >= 0:
            return np.nan  # 不均值回复
        return np.log(2) / abs(lambda_hat)
    except:
        return np.nan


def cointegration_test(series_a, series_b, maxlag=None):
    """
    Engle-Granger两步法协整检验

    返回: {
        'coint_t': t统计量,
        'pvalue': p值,
        'beta': hedge ratio,
        'spread_std': 价差标准差,
        'half_life': 半衰期(天),
        'is_pair': bool
    }
    """
    if len(series_a) < 60:
        return None

    # Step 1: OLS回归 Y = α + β·X + ε
    x = np.column_stack([np.ones(len(series_b)), series_b])
    beta, residuals, _, _ = np.linalg.lstsq(x, series_a, rcond=None)
    alpha, hedge_ratio = beta[0], beta[1]
    spread = series_a - alpha - hedge_ratio * series_b

    # Step 2: ADF检验残差的平稳性
    # 注意:协整检验的临界值不是标准ADF临界值
    adf_result = adfuller(spread, maxlag=maxlag or 12, regression='c')
    t_stat, p_value = adf_result[0], adf_result[1]

    # 同步使用 statsmodels coint 函数做交叉验证
    coint_result = coint(series_a, series_b)
    coint_t, coint_p, _ = coint_result[0], coint_result[1], coint_result[2]

    # 取两种方法中更保守的p值
    p_value = max(p_value, coint_p)

    # 计算半衰期
    half_life = calculate_half_life(spread)

    # 综合判定
    is_pair = (
        p_value < 0.05 and
        not np.isnan(half_life) and
        1 < half_life < 60  # 半衰期在1-60天之间
    )

    return {
        'coint_t': min(t_stat, coint_t),  # 取更严格的统计量
        'pvalue': p_value,
        'alpha': alpha,
        'beta': hedge_ratio,
        'spread_mean': float(spread.mean()),
        'spread_std': float(spread.std()),
        'half_life': half_life,
        'n_observations': len(series_a),
        'is_pair': is_pair
    }


def screen_pairs_from_pool(client, symbol_pool, lookback_days=252, min_corr=0.3, max_corr=0.95):
    """
    从候选池中筛选协整配对

    参数:
        client: TickDBClient 实例
        symbol_pool: 股票代码列表
        lookback_days: 回看天数
        min_corr/max_corr: 相关性阈值(避免假相关和高相关陷阱)

    返回: 协整配对列表,按 pvalue 排序
    """
    results = []
    total_pairs = len(list(combinations(symbol_pool, 2)))

    print(f"开始协整检验,共 {total_pairs} 对候选...")
    print(f"预计耗时: {total_pairs * 0.1 / 3600:.1f} 小时 (单对约0.1秒)")

    for i, (sym_a, sym_b) in enumerate(combinations(symbol_pool, 2)):
        if i > 0 and i % 50 == 0:
            print(f"  进度: {i}/{total_pairs} ({i/total_pairs*100:.1f}%)")

        # 1. 预筛选:相关性过滤
        try:
            pair_data = fetch_pair_data(client, sym_a, sym_b, days=lookback_days)
            if pair_data is None:
                continue

            if len(pair_data) < 60:
                continue

            corr = pair_data[sym_a].corr(pair_data[sym_b])
            if abs(corr) < min_corr or abs(corr) > max_corr:
                continue

        except Exception as e:
            continue

        # 2. 协整检验
        try:
            result = cointegration_test(
                pair_data[sym_a].values,
                pair_data[sym_b].values
            )
            if result is None:
                continue

            result['symbol_a'] = sym_a
            result['symbol_b'] = sym_b
            result['correlation'] = corr

            if result['is_pair']:
                results.append(result)
                print(f"  ✅ 找到配对: {sym_a} / {sym_b} "
                      f"(p={result['pvalue']:.4f}, HL={result['half_life']:.1f}天)")

        except Exception as e:
            continue

    # 按 pvalue 升序排列(越小越强)
    results.sort(key=lambda x: x['pvalue'])
    return results


# ── 演示:针对两只股票做单对检验 ──
if __name__ == "__main__":
    # 典型配对示例:石油股 XLE 内部
    test_pairs = [
        ("XOM.US", "CVX.US"),   # 埃克森美孚 vs 雪佛龙
        ("SPY.US", "QQQ.US"),   # S&P500 ETF vs 纳斯达克100 ETF
        ("BRK.B.US", "SPY.US"), # 伯克希尔B vs SPY
    ]

    client = TickDBClient()
    for sym_a, sym_b in test_pairs:
        print(f"\n检验配对: {sym_a} / {sym_b}")
        pair_data = fetch_pair_data(client, sym_a, sym_b, days=252)
        if pair_data is not None:
            result = cointegration_test(pair_data[sym_a].values, pair_data[sym_b].values)
            if result:
                print(f"  相关性: {pair_data[sym_a].corr(pair_data[sym_b]):.4f}")
                print(f"  协整p值: {result['pvalue']:.4f}")
                print(f"  β(hedge ratio): {result['beta']:.4f}")
                print(f"  半衰期: {result['half_life']:.1f} 天")
                print(f"  是否为可套利配对: {'✅ 是' if result['is_pair'] else '❌ 否'}")

五、卡尔曼滤波:实时追踪动态对冲比率

5.1 为什么需要在线估计算法

OLS 给出的是一个事后静态的 hedge ratio,你在回测中使用它时,实际上假设了过去 252 天的平均关系会在明天继续。但实证研究表明,股票间的协整关系往往在财报季、并购传闻或宏观冲击后发生结构性漂移。

卡尔曼滤波的核心优势在于:它为每一时刻 t 提供了基于所有可用信息的最佳线性无偏估计(BLUE),同时这个估计天然地对近期数据赋予更高权重。

5.2 状态空间模型详解

状态方程(假设 hedge ratio 服从随机游走):
$$\beta_t = \beta_{t-1} + \omega_t, \quad \omega_t \sim N(0, Q)$$

观测方程
$$y_t = \alpha + \beta_t x_t + \nu_t, \quad \nu_t \sim N(0, R)$$

其中:

  • $y_t$:标的资产 A 的价格
  • $x_t$:标的资产 B 的价格(对冲工具)
  • $\beta_t$:时变的对冲比率(这是我们想追踪的状态)
  • $Q$:状态噪声方差($\beta$ 的变化速度)
  • $R$:观测噪声方差(价格中的随机扰动)

预测-更新循环

对于每个新观测 (x_t, y_t):

预测步:
  β_pred = β_post                    # 先验估计 = 上一步后验
  P_pred = P_post + Q                # 预测误差方差

更新步:
  K = P_pred / (P_pred + R)          # 卡尔曼增益
  β_post = β_pred + K(y_t - x_t·β_pred)  # 后验估计
  P_post = (1 - K)·P_pred           # 后验误差方差

5.3 生产级卡尔曼滤波器实现

class KalmanPairTrader:
    """
    基于卡尔曼滤波的动态配对交易策略

    关键参数:
        delta: 状态噪声的缩放因子 (建议: 1e-3 ~ 1e-2)
               delta越大 → hedge ratio 跟踪越灵敏 → 交易越频繁
        Ve: 观测噪声方差 (建议: 0.001 ~ 0.01)
    """

    def __init__(self, delta=1e-4, Ve=1e-3):
        self.delta = delta      # 状态噪声尺度
        self.Ve = Ve            # 观测噪声方差
        self.R = None           # 观测噪声协方差矩阵

        # 状态变量(随滤波器迭代更新)
        self.beta = None        # 动态 hedge ratio
        self.alpha = 0.0        # 价差均值
        self.P = None           # 估计误差协方差
        self.initialized = False

    def _init_state(self, y_0, x_0):
        """用前两个观测初始化状态"""
        self.beta = y_0 / x_0 if x_0 != 0 else 1.0
        self.alpha = 0.0
        self.P = np.array([[1.0]])  # 初始误差协方差
        self.R = np.array([[self.Ve]])
        self.initialized = True

    def update(self, y_t, x_t):
        """
        单步卡尔曼更新
        返回: (spread_t, beta_t, kalman_gain, stderr_t)
        """
        if not self.initialized:
            self._init_state(y_t, x_t)
            spread = y_t - self.alpha - self.beta * x_t
            return spread, self.beta, 0.0, np.sqrt(self.P[0, 0])

        # ── 预测步 ──
        beta_pred = self.beta          # 状态预测(随机游走)
        P_pred = self.P + self.delta   # 方差预测

        # ── 更新步 ──
        # 观测残差
        y_pred = self.alpha + beta_pred * x_t
        y_diff = y_t - y_pred

        # 残差方差 = P_pred·x_t² + R
        S = float(x_t ** 2 * P_pred + self.Ve)

        # 卡尔曼增益
        K = P_pred * x_t / S

        # 状态更新
        self.beta = float(beta_pred + K * y_diff)
        self.alpha = float(self.alpha)  # 这里简化:alpha固定
        self.P = float(P_pred * (1 - K * x_t))

        # 计算当前价差(卡尔曼平滑后的spread)
        spread = y_t - self.alpha - self.beta * x_t
        kalman_gain = float(K)

        return spread, self.beta, kalman_gain, np.sqrt(self.P)

    def batch_update(self, prices_a, prices_b):
        """
        批量更新,返回完整的价差序列和对冲比率序列
        用于回测和历史分析
        """
        spreads = []
        betas = []
        gains = []
        stderrs = []

        for y_t, x_t in zip(prices_a, prices_b):
            spread, beta, gain, stderr = self.update(y_t, x_t)
            spreads.append(spread)
            betas.append(beta)
            gains.append(gain)
            stderrs.append(stderr)

        return {
            'spread': np.array(spreads),
            'beta': np.array(betas),
            'kalman_gain': np.array(gains),
            'std_error': np.array(stderrs)
        }


def backtest_kalman_pair(
    client,
    symbol_a,
    symbol_b,
    lookback=252,
    entry_threshold=2.0,
    exit_threshold=0.5,
    z_window=20,
    hedge_ratio_estimator="kalman"
):
    """
    配对交易回测(使用卡尔曼滤波动态对冲比率)

    策略逻辑:
    1. 用过去 lookback 天初始化卡尔曼滤波器
    2. 每日更新卡尔曼状态,获取动态价差
    3. Z-score > entry_threshold → 开仓(价差高估或低估)
    4. Z-score < exit_threshold → 平仓
    5. 每日收盘调仓
    """
    # 获取数据
    pair_data = fetch_pair_data(client, symbol_a, symbol_b, days=lookback + 60)
    if pair_data is None or len(pair_data) < lookback:
        return None

    train_data = pair_data.iloc[:lookback]
    test_data = pair_data.iloc[lookback:]

    # ── 训练期:初始化卡尔曼滤波器 ──
    kf = KalmanPairTrader(delta=1e-4, Ve=1e-3)
    train_result = kf.batch_update(
        train_data[symbol_a].values,
        train_data[symbol_b].values
    )

    # 用训练期数据校准Z-score基准
    spread_mean = np.mean(train_result['spread'])
    spread_std = np.std(train_result['spread'])
    initial_beta = train_result['beta'][-1]

    # ── 测试期:逐日回测 ──
    position = 0          # 当前持仓: +1(A多B空), -1(A空B多), 0(空仓)
    entry_spread = 0
    trades = []

    # 用滚动窗口计算实时Z-score
    recent_spreads = list(train_result['spread'][-z_window:])

    for i, (idx, row) in enumerate(test_data.iterrows()):
        y_t, x_t = row[symbol_a], row[symbol_b]

        if i == 0:
            kf._init_state(train_data[symbol_a].iloc[-1], train_data[symbol_b].iloc[-1])

        # 卡尔曼更新
        spread_t, beta_t, _, _ = kf.update(y_t, x_t)
        recent_spreads.append(spread_t)

        # 滚动窗口Z-score
        window_spreads = recent_spreads[-z_window:]
        z_score = (spread_t - np.mean(window_spreads)) / (np.std(window_spreads) + 1e-10)

        date_str = idx.strftime('%Y-%m-%d')

        # ── 交易逻辑 ──
        if position == 0:
            if z_score > entry_threshold:
                position = 1
                entry_spread = spread_t
                trades.append({
                    'date': date_str, 'action': 'SHORT_SPREAD',
                    'z_score': z_score, 'beta': beta_t
                })
            elif z_score < -entry_threshold:
                position = -1
                entry_spread = spread_t
                trades.append({
                    'date': date_str, 'action': 'LONG_SPREAD',
                    'z_score': z_score, 'beta': beta_t
                })

        elif position == 1:
            if z_score < exit_threshold:
                pnl = entry_spread - spread_t
                trades.append({
                    'date': date_str, 'action': 'CLOSE_SHORT',
                    'z_score': z_score, 'pnl': pnl, 'beta': beta_t
                })
                position = 0

        elif position == -1:
            if z_score > -exit_threshold:
                pnl = spread_t - entry_spread
                trades.append({
                    'date': date_str, 'action': 'CLOSE_LONG',
                    'z_score': z_score, 'pnl': pnl, 'beta': beta_t
                })
                position = 0

    # 计算绩效指标
    closed_trades = [t for t in trades if 'pnl' in t]
    if not closed_trades:
        return {'symbol_a': symbol_a, 'symbol_b': symbol_b, 'n_trades': 0}

    pnls = [t['pnl'] for t in closed_trades]
    n_wins = sum(1 for p in pnls if p > 0)
    n_losses = sum(1 for p in pnls if p <= 0)

    # 简化PnL(未计入交易成本,实际需扣除佣金+滑点)
    total_pnl = sum(pnls)
    avg_pnl = np.mean(pnls)
    win_rate = n_wins / len(pnls) if pnls else 0

    return {
        'symbol_a': symbol_a, 'symbol_b': symbol_b,
        'n_trades': len(closed_trades),
        'n_wins': n_wins, 'n_losses': n_losses,
        'win_rate': win_rate,
        'total_pnl': total_pnl,
        'avg_pnl_per_trade': avg_pnl,
        'final_beta': kf.beta,
        'beta_drift': abs(kf.beta - initial_beta)
    }


# ── 演示:单对回测 ──
if __name__ == "__main__":
    client = TickDBClient()

    print("演示卡尔曼滤波动态对冲效果")
    print("=" * 50)

    for sym_a, sym_b in [("SPY.US", "QQQ.US"), ("XOM.US", "CVX.US")]:
        pair_data = fetch_pair_data(client, sym_a, sym_b, days=300)
        if pair_data is not None:
            kf = KalmanPairTrader(delta=1e-4)
            result = kf.batch_update(
                pair_data[sym_a].values[:200],
                pair_data[sym_b].values[:200]
            )

            print(f"\n配对 {sym_a} / {sym_b}:")
            print(f"  初始 β: {result['beta'][0]:.4f}")
            print(f"  最终 β: {result['beta'][-1]:.4f}")
            print(f"  β 变化幅度: {abs(result['beta'][-1] - result['beta'][0]):.4f}")
            print(f"  平均卡尔曼增益: {np.mean(result['kalman_gain'][10:]):.6f}")
            print(f"  增益递减: {result['kalman_gain'][10] > result['kalman_gain'][-1]}")

六、完整配对筛选系统

将以上组件整合为一个端到端的筛选系统:

class PairsScreener:
    """
    完整配对筛选系统

    使用四阶段漏斗:
    1. 按行业/板块分组候选池
    2. 相关性预筛
    3. 协整检验
    4. 半衰期 + 波动率质量评估
    """

    def __init__(self, client, min_observations=120):
        self.client = client
        self.min_observations = min_observations

    def run_full_screen(
        self,
        symbols,
        lookback_days=252,
        min_corr=0.3,
        max_corr=0.95,
        max_half_life=60,
        min_half_life=1,
        verbose=True
    ):
        """执行完整筛选流程"""
        from itertools import combinations
        import concurrent.futures

        start_time = time.time()
        pairs_checked = 0
        pairs_passed_corr = 0
        final_pairs = []

        candidate_pairs = list(combinations(symbols, 2))

        if verbose:
            print(f"📊 开始全量筛选: {len(candidate_pairs)} 对候选")
            print(f"   参数: lookback={lookback_days}天, "
                  f"corr∈[{min_corr},{max_corr}], "
                  f"HL∈[{min_half_life},{max_half_life}]天")
            print()

        for sym_a, sym_b in candidate_pairs:
            pairs_checked += 1

            # 阶段1+2: 数据获取 + 相关性预筛
            try:
                pair_data = fetch_pair_data(
                    self.client, sym_a, sym_b, days=lookback_days
                )
                if pair_data is None or len(pair_data) < self.min_observations:
                    continue

                corr = pair_data[sym_a].corr(pair_data[sym_b])
                if abs(corr) < min_corr or abs(corr) > max_corr:
                    continue

                pairs_passed_corr += 1

            except Exception:
                continue

            # 阶段3: 协整检验
            try:
                coint_result = cointegration_test(
                    pair_data[sym_a].values,
                    pair_data[sym_b].values
                )
                if coint_result is None or not coint_result['is_pair']:
                    continue

                hl = coint_result['half_life']
                if hl < min_half_life or hl > max_half_life:
                    continue

                # 质量评估:价差波动率不能太低(否则手续费侵蚀利润)
                spread = pair_data[sym_a] - coint_result['alpha'] - \
                         coint_result['beta'] * pair_data[sym_b]
                spread_vol = spread.std()
                price_scale = min(pair_data[sym_a].mean(), pair_data[sym_b].mean())

                if spread_vol / price_scale < 0.005:  # 相对波动<0.5%
                    continue

                coint_result['symbol_a'] = sym_a
                coint_result['symbol_b'] = sym_b
                coint_result['correlation'] = corr
                coint_result['relative_vol'] = float(spread_vol / price_scale)
                final_pairs.append(coint_result)

                if verbose:
                    print(f"  ✅ [{len(final_pairs):3d}] {sym_a} / {sym_b}  "
                          f"| ρ={corr:.3f} "
                          f"| p={coint_result['pvalue']:.4f} "
                          f"| HL={hl:.1f}天 "
                          f"| β={coint_result['beta']:.3f}")

            except Exception as e:
                continue

        elapsed = time.time() - start_time

        # 按p值排序
        final_pairs.sort(key=lambda x: x['pvalue'])

        # 统计摘要
        summary = {
            'total_pairs_checked': pairs_checked,
            'pairs_passed_corr': pairs_passed_corr,
            'pairs_final': len(final_pairs),
            'elapsed_seconds': elapsed,
            'throughput': pairs_checked / elapsed if elapsed > 0 else 0,
            'results': final_pairs
        }

        if verbose:
            print(f"\n📈 筛选完成:")
            print(f"   检验总数: {pairs_checked}")
            print(f"   相关性通过: {pairs_passed_corr} ({pairs_passed_corr/pairs_checked*100:.1f}%)")
            print(f"   协整+质量通过: {len(final_pairs)}")
            print(f"   耗时: {elapsed:.1f}秒 ({summary['throughput']:.1f} 对/秒)")

        return summary


# ── 使用示例 ──
if __name__ == "__main__":
    client = TickDBClient()

    # 为演示,使用一个小型候选池(实际使用时替换为全市场股票池)
    demo_symbols = [
        "XOM.US", "CVX.US", "COP.US", "SLB.US",   # 能源
        "JPM.US", "BAC.US", "WFC.US", "GS.US",     # 金融
        "JNJ.US", "PFE.US", "UNH.US", "ABBV.US",   # 医疗
        "AAPL.US", "MSFT.US", "GOOGL.US", "META.US", # 科技
    ]

    screener = PairsScreener(client)
    report = screener.run_full_screen(
        symbols=demo_symbols,
        lookback_days=252,
        min_corr=0.4,
        max_corr=0.95,
        min_half_life=3,
        max_half_life=40
    )

    if report['results']:
        print("\n🏆 最终配对池(Top 5):")
        print(f"{'配对':<30} {'相关性':>8} {'p值':>8} {'半衰期':>8} {'相对波动':>10}")
        print("-" * 70)
        for p in report['results'][:5]:
            pair_name = f"{p['symbol_a']} / {p['symbol_b']}"
            print(f"{pair_name:<30} {p['correlation']:>8.4f} "
                  f"{p['pvalue']:>8.4f} {p['half_life']:>8.1f} "
                  f"{p['relative_vol']:>10.4f}")

七、回测结果解读与陷阱

7.1 典型的协整配对画像

从大量实证研究中,可交易的协整配对通常有以下特征:

特征 合理范围 异常信号
p值 < 0.01(强协整) > 0.05
半衰期 5-30 天 < 2天(太贵)或 > 60天(太慢)
相关性 0.5-0.9 < 0.3 或 > 0.98
β的年化漂移 < 20% > 50%(结构性断裂)
相对价差波动 0.01-0.05 < 0.005(不够覆盖交易成本)

7.2 三大经典陷阱

陷阱一:lookahead bias

你在 2024 年做回测,发现 2019-2023 年的协整配对表现完美。但很多配对之所以"历史上协整",是因为后来的结构性变化还没有发生。解决方案:只用截至回测起点的数据来发现配对,每次重新筛选时不要用未来信息。

陷阱二:幸存者偏差

你的股票池里只包含当前还活着的公司。但历史上退市、破产、被并购的股票同样会出现在"历史配对"中——而这些股票如果在回测期内退市,往往会导致协整关系突然断裂。解决方案:使用包含退市股票的完整历史数据库(如 Compustat、CRSP),或至少在模拟中记录"如果持仓标的退市怎么办"的应急预案。

陷阱三:交易成本侵蚀

很多回测中的"盈利"在扣除真实交易成本后变成亏损。以美股为例:

成本项 单次交易估算
佣金 $0.005/股
SEC 费 $0.000119/股(卖出时)
滑点(流动性差的配对) 0.01-0.05%
价差成本 Bid-Ask spread 均值

一对半衰期 5 天的配对,理想情况下每月交易 3-4 次。如果每次交易的隐性成本是 0.03%,年化就是 1%+。只有价差波动率 > 0.015/天 的配对,才有可能覆盖这些成本

回测局限性说明:上述回测结果基于 TickDB 历史 K 线数据模拟,不构成未来收益保证。回测中存在以下未建模因素:实际交易中的滑点(特别是在流动性枯竭时);隔夜持仓的隔夜风险溢价;极端行情下价差不回归的概率;以及最重要的——配对协整关系的结构性断裂风险。建议在实际部署前进行更严格的样本外测试和压力测试。


八、为什么这个框架值得你花时间构建

统计套利的核心竞争优势,不在于你发现了某个神奇指标,而在于你有一套可持续、可扩展、可解释的配对发现和风险管理流程。

这套流程的价值在于三个层次的壁垒:

第一层:数据处理壁垒。历史 K 线数据的对齐、缺失值处理、复权处理,这些脏活累活做好了,后面的统计检验才有意义。TickDB 提供清洗对齐的 10 年级别美股历史数据,意味着你可以做跨越多个经济周期的配对稳定性分析。

第二层:方法论壁垒。协整检验 + 卡尔曼滤波的组合,比单纯用相关系数筛选的策略,在结构性变化面前有显著更强的鲁棒性。

第三层:执行壁垒。知道一个配对会回归,和能在合理成本内执行这个回归,是两件完全不同的事。动态对冲比率的实时更新能力,直接决定了你在价差回归路上能保留多少利润。


下一步行动

如果你想亲手实现这套筛选系统

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key,设置环境变量 TICKDB_API_KEY
  3. 复制本文代码,将 demo_symbols 替换为你想要扫描的完整股票池
  4. 从小候选池(如 20 只股票,190 对)开始,验证流程后再扩大规模

如果你对卡尔曼滤波的理论推导感兴趣:建议阅读 Hamilton Time Series Analysis 第 13 章(状态空间模型)和 Algorithmic Trading(D. J. Stefanini)关于卡尔曼滤波配对交易的章节。

如果你在构建自己的配对监控系统:建议将卡尔曼滤波的 beta 估计误差(std_error)作为一个风控指标——当 std_error 突然增大时,往往意味着市场微观结构发生了变化,应当触发告警并暂停该配对的自动交易。

如果你需要更长的历史数据做多周期验证:联系 [email protected] 了解机构级数据方案,支持更长回测周期和完整的 tick 级逐笔数据(适用于港股和数字货币)。


风险提示:本文不构成任何投资建议。统计套利策略存在模型风险、执行风险和流动性风险。历史表现不代表未来收益。在实际交易前,请确保充分理解策略逻辑并进行充分的样本外测试。市场有风险,投资需谨慎。