美股统计套利:跨标的协整配对交易的完整实现

"市场在短期内是投票机,长期是称重机。"——本杰明·格雷厄姆

1980 年代,纽约证券交易所的做市商柜台旁,格里·班伯格(Gerry Bamberger)观察到一个现象:两只看似毫无关联的航空股,其价格走势却呈现出惊人的同步性——每当Delta Airlines上涨,American Airlines总会在数分钟后跟随,而这种短暂的背离最终会回归。班伯格没有急于追涨杀跌,而是开始量化这种"背离-回归"的幅度与周期。他或许没有意识到,自己正在开创一个延续至今的量化策略范式:统计套利。

四十年后,协整配对交易(Cointegration Pairs Trading)已从华尔街的神秘黑箱演变为开源社区的经典教程。然而,当开发者试图将论文中的数学公式转化为生产级代码时,往往会发现三道鸿沟:如何在数千只美股中高效筛选协整对?历史回测中的"未来函数"陷阱如何规避?实盘监控中价差的剧烈波动如何实时捕获?

本文将从协整检验的统计原理出发,依次解决这三道鸿沟,给出从历史筛选到实时监控的完整闭环。全文包含可直接运行的生产级 Python 代码,数据获取部分采用 TickDB REST API。


一、从"虚假相关"到"真正协整"

1.1 相关性的致命陷阱

许多量化新手的第一步是计算两只股票的皮尔逊相关系数。相关性越高,似乎越适合做配对——这是一个危险的假设。

考虑一个经典案例:S&P 500 指数与北卡罗来纳州的溺水死亡人数,历史上相关系数高达 0.87。但这并不意味着买指数能预测溺水——两者都与"夏季"这个混淆变量强相关。相关性只告诉你两个变量"一起动",不告诉你"为什么一起动",更不告诉你"背离后会不会回来"。

在股票配对中,这个陷阱表现为:两只科技股可能因为同受"科技板块整体走势"驱动而呈现高相关,但当大盘下跌时,它们可能永远不会再回归到历史价差水平。

1.2 协整的统计定义

协整(Cointegration)回答的是更深层的问题:如果两个时间序列都是"不稳定的"(随机游走),是否存在一个线性组合,使这个组合变成"稳定的"(均值回归)?

形式化表述:对于两个 I(1) 序列 $X_t$ 和 $Y_t$(一阶单整,即一阶差分后平稳),如果存在一个参数 $\beta$,使得:

$$Z_t = X_t - \beta Y_t \sim I(0)$$

即残差序列 $Z_t$ 是平稳的,则称 $X_t$ 和 $Y_t$ 是协整的,$\beta$ 为协整向量。

核心直觉:两只股票可能各自随机游走,但它们之间的"差距"会围绕一个常数均值波动。这个"差距"就是配对交易的利润来源——当价差偏离均值时做均值回归。

1.3 Engle-Granger 两步法

工业界最常用的协整检验方法是 Engle-Granger 两步法:

第一步:OLS 回归。假设 $Y_t = \alpha + \beta X_t + \epsilon_t$,用最小二乘法估计 $\hat{\alpha}$ 和 $\hat{\beta}$。

第二步:ADF 检验。对残差 $\hat{\epsilon}_t = Y_t - \hat{\alpha} - \hat{\beta} X_t$ 进行单位根检验。如果拒绝单位根假设,则认为 $X_t$ 和 $Y_t$ 协整。

import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint

def engle_granger_test(series1: np.ndarray, series2: np.ndarray) -> dict:
    """
    Engle-Granger 两步法协整检验
    
    Returns:
        dict: 包含 t-statistic, p-value, 是否协整 的字典
    """
    # 合并数据,处理 NaN
    df = pd.DataFrame({'x': series1, 'y': series2}).dropna()
    
    if len(df) < 30:
        return {'coint': False, 'reason': '样本量不足'}
    
    # statsmodels 内置协整检验( Engle-Granger 方法)
    score, pvalue, _ = coint(df['x'], df['y'])
    
    # 通常 p-value < 0.05 认为协整
    is_coint = pvalue < 0.05
    
    return {
        'coint': is_coint,
        't_stat': score,
        'p_value': pvalue,
        'significant': pvalue < 0.01,  # 更严格的阈值
        'sample_size': len(df)
    }

二、从数千只股票中筛选候选配对

2.1 筛选流程设计

纽交所和纳斯达克共有超过 8,000 只股票,穷举所有配对需要 $C_2^{8000} \approx 32,000,000$ 次协整检验,这在计算上是不现实的。需要一个三阶段筛选流水线:

阶段一:流动性过滤
    ↓
阶段二:行业/板块相关性预筛选
    ↓
阶段三:协整检验

阶段一:流动性过滤。剔除日均成交量(ADTV)低于阈值、股价过低(可能被合股预期影响)、波动率异常(财报季前后)的标的。这一步将候选池缩减至 500-1000 只。

阶段二:行业/板块相关性预筛选。协整不要求同行业,但同行业股票受相同基本面驱动,历史上协整的概率更高。可按 GICS 行业分类先验缩小范围。

阶段三:协整检验。对候选对逐一执行 ADF 检验,记录 p-value,筛选阈值内的配对。

2.2 行业配对候选表

以下是美股中历史上协整关系较为稳定的行业配对类型:

行业组合 代表配对 协整逻辑
大型航空 DAL / UAL 运力、燃油成本、旅客需求高度同步
综合能源 XOM / CVX 全球油价传导、资本配置相似
四大银行 JPM / BAC / WFC / C 同业拆借利率、信贷周期共振
半导体设备 ASML / LRCX / AMAT 晶圆厂资本开支周期同步
保险巨头 BRK.B / TRV / CB 巨灾风险、利率环境趋同
零售巨头 WMT / TGT 消费必需品定价权博弈

2.3 生产级筛选代码

以下代码实现完整的配对筛选流水线。历史 K 线数据通过 TickDB API 获取,支持按需指定时间范围和标的数量:

import os
import time
import itertools
import requests
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed

# ============================================================
# TickDB API 调用封装
# ============================================================

class TickDBClient:
    """TickDB REST API 封装"""
    
    BASE_URL = "https://api.tickdb.ai/v1"
    
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置 TICKDB_API_KEY 环境变量")
    
    def _headers(self) -> dict:
        return {"X-API-Key": self.api_key}
    
    def get_kline(
        self,
        symbol: str,
        interval: str = "1d",
        start_time: int = None,
        end_time: int = None,
        limit: int = 500
    ) -> pd.DataFrame:
        """
        获取历史 K 线数据
        
        Args:
            symbol: 交易品种,如 'JPM.US'
            interval: K 线周期,'1d'/'1h'/'5m' 等
            start_time: 毫秒级 Unix 时间戳
            end_time: 毫秒级 Unix 时间戳
            limit: 单次请求最大条数(TickDB 默认上限)
        
        Returns:
            DataFrame,包含 open/high/low/close/vol 等列
        """
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        }
        if start_time:
            params["start_time"] = start_time
        if end_time:
            params["end_time"] = end_time
        
        response = requests.get(
            f"{self.BASE_URL}/market/kline",
            headers=self._headers(),
            params=params,
            timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
        )
        
        # TickDB 标准错误处理
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 5))
            print(f"频率超限,等待 {retry_after} 秒")
            time.sleep(retry_after)
            return self.get_kline(symbol, interval, start_time, end_time, limit)
        
        data = response.json()
        if data.get("code") != 0:
            raise RuntimeError(f"API 错误: {data}")
        
        return pd.DataFrame(data["data"])


# ============================================================
# 配对筛选流水线
# ============================================================

class PairsScreener:
    """协整配对筛选器"""
    
    def __init__(
        self,
        db_client: TickDBClient,
        min_adtv: int = 1_000_000,  # 最小日均成交量
        min_price: float = 5.0,      # 最小股价
        lookback_days: int = 252,    # 回看天数(约一年)
        coint_pvalue_threshold: float = 0.05
    ):
        self.db = db_client
        self.min_adtv = min_adtv
        self.min_price = min_price
        self.lookback_days = lookback_days
        self.coint_threshold = coint_pvalue_threshold
        
        # 计算时间范围(TickDB 使用毫秒级时间戳)
        self.end_time = int(time.time() * 1000)
        self.start_time = self.end_time - (lookback_days * 24 * 3600 * 1000)
    
    def fetch_price_series(self, symbol: str) -> pd.Series:
        """
        获取单只标的的收盘价序列
        
        ⚠️ 注意:实际生产中应加入心跳/重连机制
        """
        df = self.db.get_kline(
            symbol,
            interval="1d",
            start_time=self.start_time,
            end_time=self.end_time
        )
        
        if df.empty or len(df) < 60:
            return pd.Series(dtype=float)
        
        # 返回收盘价序列
        return df.set_index('time')['close'].sort_index()
    
    def compute_adtv(self, symbol: str) -> float:
        """计算日均成交量"""
        df = self.db.get_kline(
            symbol,
            interval="1d",
            start_time=self.start_time,
            end_time=self.end_time
        )
        
        if df.empty or 'vol' not in df.columns:
            return 0.0
        
        return df['vol'].tail(30).mean()
    
    def liquidity_filter(self, symbols: List[str]) -> List[str]:
        """
        阶段一:流动性过滤
        
        ⚠️ 生产环境建议异步批量请求,避免阻塞
        """
        filtered = []
        
        for sym in symbols:
            try:
                # 并行检查价格和成交量
                price_series = self.fetch_price_series(sym)
                if price_series.empty:
                    continue
                
                # 最新价格 > 阈值
                current_price = price_series.iloc[-1]
                if current_price < self.min_price:
                    continue
                
                # 日均成交量 > 阈值
                adtv = self.compute_adtv(sym)
                if adtv < self.min_adtv:
                    continue
                
                filtered.append(sym)
                print(f"✓ {sym}: 价格=${current_price:.2f}, ADTV=${adtv/1e6:.1f}M")
                
            except Exception as e:
                print(f"✗ {sym}: {e}")
                continue
        
        return filtered
    
    def coint_test_pair(
        self,
        series1: pd.Series,
        series2: pd.Series
    ) -> Dict:
        """对一对标的执行协整检验"""
        from statsmodels.tsa.stattools import coint
        
        # 对齐数据
        df = pd.DataFrame({'s1': series1, 's2': series2}).dropna()
        
        if len(df) < 60:
            return {'coint': False, 'reason': '样本不足'}
        
        score, pvalue, _ = coint(df['s1'], df['s2'])
        
        return {
            'coint': pvalue < self.coint_threshold,
            'p_value': pvalue,
            't_stat': score,
            'sample_size': len(df)
        }
    
    def screen_pairs(
        self,
        symbols: List[str],
        max_pairs: int = 100
    ) -> pd.DataFrame:
        """
        完整筛选流程
        
        Returns:
            DataFrame,包含候选配对及其统计量
        """
        print(f"\n=== 阶段一:流动性过滤 ===")
        liquid_symbols = self.liquidity_filter(symbols)
        print(f"\n流动性过滤后剩余: {len(liquid_symbols)} 只\n")
        
        if len(liquid_symbols) < 2:
            return pd.DataFrame()
        
        print(f"=== 阶段二:协整检验 ===")
        print(f"待检验组合数: {len(liquid_symbols) * (len(liquid_symbols) - 1) // 2}")
        
        results = []
        
        # ⚠️ 生产环境建议:
        # 1. 加入指数退避重连
        # 2. 使用 asyncio 并发请求
        # 3. 加入速率限制
        
        for i, sym1 in enumerate(liquid_symbols):
            for sym2 in liquid_symbols[i+1:]:
                try:
                    series1 = self.fetch_price_series(sym1)
                    series2 = self.fetch_price_series(sym2)
                    
                    result = self.coint_test_pair(series1, series2)
                    
                    if result['coint']:
                        results.append({
                            'symbol1': sym1,
                            'symbol2': sym2,
                            'p_value': result['p_value'],
                            't_stat': result['t_stat'],
                            'sample_size': result['sample_size']
                        })
                        print(f"✓ 候选配对: {sym1} / {sym2} (p={result['p_value']:.4f})")
                    
                except Exception as e:
                    print(f"✗ {sym1}/{sym2}: {e}")
                    continue
        
        df_results = pd.DataFrame(results)
        
        if not df_results.empty:
            df_results = df_results.sort_values('p_value').head(max_pairs)
        
        return df_results

2.4 回测中的"未来函数"陷阱

重要警告:配对筛选必须使用样本内数据,验证必须使用样本外数据。常见的"未来函数"陷阱包括:

陷阱类型 描述 规避方法
全局最优陷阱 用全部历史数据筛选配对,然后在相同数据上回测 时间序列切分:筛选期(2018-2021)/ 验证期(2022-2023)/ 测试期(2024)
幸存者偏差 只选用当前仍存在的股票,忽略退市标的 使用当年全量股票列表(含退市)回测
前视偏差 计算指标时不经意使用了未来数据 确保每个时间点只使用截至该点的历史数据

三、价差 Z-Score:信号生成的量化框架

3.1 价差与价差的数学形式

协整检验给出的协整向量 $\beta$ 定义了价差的计算方式:

$$\text{spread}_t = X_t - \beta Y_t$$

其中 $\beta$ 通过 OLS 回归拟合得到。然而,$\beta$ 本身可能随时间漂移。滚动协整(Rolling Cointegration)通过固定窗口计算滚动 $\beta$,能更好地适应市场结构变化:

$$\beta_t = \frac{\text{Cov}(X_{t-n:t}, Y_{t-n:t})}{\text{Var}(Y_{t-n:t})}$$

3.2 Z-Score 标准化

直接使用价差绝对值作为信号是不可行的,因为不同配对的价差波动幅度差异巨大。Z-Score 将价差标准化到均值 0、标准差 1 的尺度:

$$\text{Z-score}t = \frac{\text{spread}t - \mu{\text{spread}}}{\sigma{\text{spread}}}$$

其中 $\mu$ 和 $\sigma$ 通常使用滚动窗口计算(避免未来数据泄露)。

3.3 布林带类信号设计

工业级配对交易系统通常设置多层阈值:

阈值 含义 动作
Z > 2.0 价差显著高于历史均值 卖出 spread(做空 X,做多 Y)
Z < -2.0 价差显著低于历史均值 买入 spread(做多 X,做空 Y)
Z 回归至 0 价差回归 平仓获利
Z > 3.0 或 < -3.0 极端偏离,可能趋势延续 止损或趋势追踪
class SpreadZScore:
    """价差 Z-Score 计算器(滚动窗口版本)"""
    
    def __init__(
        self,
        window: int = 20,           # 滚动窗口
        z_entry: float = 2.0,        # 入场阈值
        z_exit: float = 0.0,        # 出场阈值
        z_stop: float = 3.0         # 止损阈值
    ):
        self.window = window
        self.z_entry = z_entry
        self.z_exit = z_exit
        self.z_stop = z_stop
    
    def calculate_beta(
        self,
        x: pd.Series,
        y: pd.Series
    ) -> pd.Series:
        """滚动协整系数 beta"""
        return (
            x.rolling(self.window)
            .cov(y) / 
            y.rolling(self.window).var()
        )
    
    def calculate_spread(
        self,
        x: pd.Series,
        y: pd.Series
    ) -> pd.Series:
        """计算价差 spread = x - beta * y"""
        beta = self.calculate_beta(x, y)
        # 对齐索引
        aligned = pd.DataFrame({'x': x, 'y': y, 'beta': beta}).dropna()
        return aligned['x'] - aligned['beta'] * aligned['y']
    
    def calculate_zscore(self, spread: pd.Series) -> pd.Series:
        """
        计算 Z-Score
        
        ⚠️ 滚动 Z-Score 中,mean 和 std 使用截至当前 t 的历史窗口
        ⚠️ 这要求数据严格按时间顺序处理
        """
        mean = spread.rolling(self.window).mean().shift(1)  # shift(1) 避免前视偏差
        std = spread.rolling(self.window).std().shift(1)
        
        return (spread - mean) / std
    
    def generate_signals(self, zscore: pd.Series) -> pd.Series:
        """
        基于 Z-Score 生成交易信号
        
        Returns:
            1: 做多 spread (做多 X,做空 Y)
            -1: 做空 spread (做空 X,做多 Y)
            0: 持仓不变
        """
        signals = pd.Series(0, index=zscore.index)
        
        # 入场逻辑
        signals[zscore < -self.z_entry] = 1    # 低于 -2.0,做多
        signals[zscore > self.z_entry] = -1     # 高于 2.0,做空
        
        # 止损逻辑
        stop_long = (signals == 1) & (zscore < -self.z_stop)
        stop_short = (signals == -1) & (zscore > self.z_stop)
        signals[stop_long | stop_short] = 0
        
        return signals

四、实时监控:从历史回测到实盘推送

4.1 实盘与回测的三大差异

回测代码可以"假装"知道未来的收盘价,实盘则必须实时处理新数据。三大核心差异:

差异维度 回测假设 实盘要求
数据频率 可用固定周期(1d/1h)的收盘价 需要逐 tick 更新(毫秒级)
信号触发 收盘价恰好等于阈值 价格在阈值附近反复穿越,需防抖动
执行滑点 固定比例滑点 流动性分层,实际滑点因价差深度而异

4.2 WebSocket 实时订阅架构

以下代码使用 TickDB WebSocket API 订阅两只股票的实时行情,并在价格变化时更新价差和 Z-Score:

import json
import time
import random
import asyncio
import websocket
from collections import deque
from typing import Dict, Optional

class RealTimePairsMonitor:
    """
    配对交易实时监控器
    
    ⚠️ 以下为简化版本,生产环境需加入:
    1. 心跳保活(ping/pong)
    2. 指数退避 + 抖动重连
    3. 限频处理(code:3001)
    4. 异步架构(asyncio)
    """
    
    WS_URL = "wss://stream.tickdb.ai/v1/ws/market"
    
    def __init__(
        self,
        symbol1: str,
        symbol2: str,
        history_prices: Dict[str, list],
        beta: float,
        z_window: int = 20
    ):
        self.symbol1 = symbol1
        self.symbol2 = symbol2
        self.beta = beta
        
        # 初始化历史价格队列
        self.prices1 = deque(history_prices.get(symbol1, []), maxlen=z_window)
        self.prices2 = deque(history_prices.get(symbol2, []), maxlen=z_window)
        
        # Z-Score 参数
        self.z_window = z_window
        
        # 持仓状态
        self.position = 0  # 1: long spread, -1: short spread, 0: flat
        self.entry_zscore = 0
        
        # WebSocket 连接
        self.ws: Optional[websocket.WebSocketApp] = None
        
    def _build_subscribe_message(self) -> dict:
        """构建 TickDB WebSocket 订阅消息"""
        return {
            "cmd": "subscribe",
            "params": {
                "channels": [f"kline_{self.symbol1}_1m", f"kline_{self.symbol2}_1m"]
            }
        }
    
    def calculate_current_zscore(self, price1: float, price2: float) -> float:
        """
        计算当前 Z-Score
        
        ⚠️ 简化版:使用固定 beta 和固定窗口 std
        ⚠️ 生产版:应滚动更新 beta 和使用滚动 mean/std
        """
        # 更新价格队列
        self.prices1.append(price1)
        self.prices2.append(price2)
        
        if len(self.prices1) < self.z_window:
            return 0.0
        
        # 计算当前 spread
        spread = price1 - self.beta * price2
        
        # 计算历史 spread 序列
        spreads = [
            p1 - self.beta * p2 
            for p1, p2 in zip(list(self.prices1)[:-1], list(self.prices2)[:-1])
        ]
        spreads.append(spread)
        
        # 滚动 Z-Score(shift(1) 确保不前视)
        mean = sum(spreads[:-1]) / len(spreads[:-1])
        variance = sum((s - mean) ** 2 for s in spreads[:-1]) / (len(spreads[:-1]) - 1)
        std = variance ** 0.5
        
        if std < 1e-10:
            return 0.0
        
        return (spread - mean) / std
    
    def on_message(self, ws, message: str):
        """处理接收到的 WebSocket 消息"""
        try:
            data = json.loads(message)
            
            # TickDB WebSocket 心跳处理
            if data.get("type") == "pong":
                return
            
            # 解析 K 线数据
            if "data" in data:
                kline = data["data"]
                symbol = kline.get("symbol")
                close_price = float(kline.get("close", 0))
                
                if symbol == self.symbol1:
                    self._process_price(symbol, close_price)
                elif symbol == self.symbol2:
                    self._process_price(symbol, close_price)
                    
        except json.JSONDecodeError:
            pass
        except Exception as e:
            print(f"消息处理错误: {e}")
    
    def _process_price(self, symbol: str, price: float):
        """处理价格更新"""
        timestamp = time.strftime("%H:%M:%S")
        
        # 计算当前 Z-Score
        if symbol == self.symbol1:
            price2 = self.prices2[-1] if self.prices2 else None
            if price2:
                zscore = self.calculate_current_zscore(price, price2)
                self._check_signals(zscore, timestamp)
        else:
            price1 = self.prices1[-1] if self.prices1 else None
            if price1:
                zscore = self.calculate_current_zscore(price1, price)
                self._check_signals(zscore, timestamp)
    
    def _check_signals(self, zscore: float, timestamp: str):
        """检查是否触发入场/出场/止损信号"""
        entry_threshold = 2.0
        exit_threshold = 0.5
        stop_threshold = 3.0
        
        log = f"[{timestamp}] Z-Score: {zscore:.3f} | Position: {self.position}"
        
        # 入场信号
        if self.position == 0:
            if zscore < -entry_threshold:
                self.position = 1
                self.entry_zscore = zscore
                log += f" | 🟢 入场: 做多 spread (Z < -{entry_threshold})"
            elif zscore > entry_threshold:
                self.position = -1
                self.entry_zscore = zscore
                log += f" | 🔴 入场: 做空 spread (Z > {entry_threshold})"
        # 出场信号
        elif self.position == 1:
            if zscore > -exit_threshold or zscore < -stop_threshold:
                pnl = self.entry_zscore - zscore  # 简化 PnL
                self.position = 0
                log += f" | ⚪ 出场: PnL={pnl:.3f}"
        elif self.position == -1:
            if zscore < exit_threshold or zscore > stop_threshold:
                pnl = zscore - self.entry_zscore
                self.position = 0
                log += f" | ⚪ 出场: PnL={pnl:.3f}"
        
        print(log)
    
    def on_error(self, ws, error):
        """WebSocket 错误处理"""
        print(f"WebSocket 错误: {error}")
    
    def on_close(self, ws, close_status_code, close_msg):
        """连接关闭回调"""
        print(f"连接关闭: {close_status_code} - {close_msg}")
    
    def on_open(self, ws):
        """连接建立后发送订阅消息"""
        subscribe_msg = self._build_subscribe_message()
        ws.send(json.dumps(subscribe_msg))
        print(f"已订阅: {self.symbol1}, {self.symbol2}")
    
    def run_with_reconnect(self, api_key: str):
        """
        运行监控(带指数退避重连)
        
        ⚠️ 生产环境建议:
        1. 使用 aiohttp / websockets 库的异步版本
        2. 分离心跳任务和消息处理任务
        3. 使用 asyncio.gather 并发订阅多对配对
        """
        base_delay = 1
        max_delay = 60
        retry_count = 0
        
        while True:
            try:
                # ⚠️ 生产环境:心跳应使用定时器独立发送
                headers = None  # WebSocket 鉴权使用 URL 参数
                
                self.ws = websocket.WebSocketApp(
                    f"{self.WS_URL}?api_key={api_key}",
                    on_message=self.on_message,
                    on_error=self.on_error,
                    on_close=self.on_close,
                    on_open=self.on_open
                )
                
                # ⚠️ 生产环境:心跳保活使用 threading.Timer 或 asyncio
                self.ws.run_forever(
                    ping_interval=30,
                    ping_timeout=10
                )
                
            except Exception as e:
                retry_count += 1
                delay = min(base_delay * (2 ** retry_count), max_delay)
                jitter = random.uniform(0, delay * 0.1)
                sleep_time = delay + jitter
                
                print(f"连接失败,{sleep_time:.1f} 秒后重连 (第 {retry_count} 次)")
                time.sleep(sleep_time)

五、配对交易的风险管理与仓位控制

5.1 波动率归一化仓位

两只股票的价格量纲不同,直接按金额等权配对会导致暴露不对称。正确做法是按波动率归一化

$$\text{shares}_X = \frac{\text{portfolio_value} \times w}{\text{price}_X \times \sigma_X}$$

$$\text{shares}_Y = \frac{\text{portfolio_value} \times w}{\text{price}_Y \times \sigma_Y}$$

其中 $w$ 为单对配对占总资金的比例,$\sigma$ 为滚动年化波动率。

5.2 最大持仓与相关性衰减

单一配对的最大仓位建议不超过总资金的 5%,全市场同时运行的配对数量建议不超过 10-15 对。随着配对数量增加,资金曲线相关性会趋于稳定(类似因子投资的边际效应递减)。

配对数量 预期夏普提升 边际成本
1 对 基准 -
5 对 +30%
10 对 +45%
20 对 +52% 高(需更多风控资源)

5.3 协整失效的识别与退出

协整关系不是永恒的。识别协整失效的预警信号:

预警指标 阈值 动作
滚动 ADF p-value 连续 5 日 > 0.2 降仓或平仓
价差半衰期 从历史均值延长 3 倍 减少头寸
协整 t-statistic 接近临界值 观察或退出

六、TickDB 在统计套利场景中的价值定位

在协整配对交易的全流程中,数据获取环节的质量直接影响策略表现。以下是与主流数据源的对比:

能力维度 Polygon Yahoo Finance TickDB
历史 K 线完整性 高(10+ 年) 中(部分标的缺失历史) 高(清洗对齐的全量历史)
WebSocket 实时订阅 支持 不支持 支持
API 稳定性 企业级 SLA 免费版不稳定 REST + WebSocket 双通道
美股 tick 级逐笔 支持 不支持 不支持(仅 K 线)
多资产统一接入 是(股票、数字货币、期货等)

对于配对交易策略,TickDB 的核心优势在于:

  • 历史 K 线质量:完整的向前复权处理,避免因除权除息导致的虚假价差跳变
  • 实时推送:WebSocket 订阅机制无需轮询,降低延迟并节省 API 调用配额
  • 多资产覆盖:同一 API 可覆盖美股、港股、数字货币等资产,适合跨市场配对策略

七、结语

统计套利的本质,是在混沌的价格波动中寻找秩序。协整配对交易的成功,既依赖于数学上的严谨检验,也依赖于工程上的稳定实现。从协整系数 $\beta$ 的 OLS 估计,到 WebSocket 推送中的 Z-Score 实时计算,每一步都需要在理论假设与市场现实之间找到平衡。

配对筛选是多对多的组合优化问题,实盘监控是从历史到当下的范式转换。TickDB 在这两个环节中提供的价值,核心是可靠的数据管道:历史回测不因数据缺失而失效,实时信号不因接口抖动而失真。

当你读完本文,协整检验的数学公式和 WebSocket 的心跳机制都已掌握。但真正的门槛在于:你是否愿意为每一对候选配对手工验证协整关系的稳定性,还是愿意建立系统化的流水线,让机器替你完成枯燥的统计检验?


下一步行动

如果你是量化研究员,关注策略逻辑本身

  • 本文提供的筛选流程可作为骨架,用更长的历史数据(建议 5 年以上)验证协整关系的稳健性
  • 建议在样本外数据上做至少 3 个月的 Walk-Forward 测试

如果你希望亲手运行本文代码

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可获取历史 K 线数据

如果你需要机构级数据方案

  • 对于需要 tick 级逐笔成交数据的场景(如港股、数字货币订单流分析),联系 [email protected] 了解 TickDB 专业版

风险提示:本文不构成任何投资建议。统计套利策略存在模型失效风险、市场条件变化风险及流动性风险。历史回测结果不代表未来表现,建议在实际使用前进行充分的样本外验证和压力测试。