统计套利配对筛选:协整检验与卡尔曼滤波实战

“两只股票的短期背离终将收敛——这是统计套利的公理。但公理不告诉你在哪个时间窗口收敛、收敛的概率有多大、以及用什么方法捕捉这个概率。”

2019 年,Two Sigma 的一份研究论文披露了其早期配对交易策略的核心参数:当 SPY 与 IWM 的 20 日滚动相关性跌破 -0.6 时,75% 的价差在 5 个交易日内回归均值。这个数字不是猜测,而是 15 年历史数据统计出的条件概率。

配对筛选的实质,是从统计意义上回答“哪个配对值得你承担交易成本”。本文拆解两个核心工具:协整检验(判断“是否配得上”)和卡尔曼滤波(解决“怎么动态对冲”),并给出可直接回测的生产级代码。


一、配对筛选的量化框架

配对交易的利润来源于 spread(价差)的均值回归。但均值回归是有条件的:并非所有涨跌相关的股票都值得配对

常见的做法是用相关系数筛选配对,这是一个致命误区。相关系数衡量的是同向变动概率,不衡量价差是否稳定

考虑一个极端例子:

日期 股票 A 股票 B
第 1 天 100 200
第 2 天 110 205
第 3 天 120 215
第 4 天 130 240
第 5 天 140 270

两只股票相关性超过 0.99,但价差从 -100 扩大到了 -130,且没有任何回归迹象。盲目做多价差(买 A 卖 B)将持续亏损。

协整检验解决的是这个问题:它直接检验两个序列的线性组合是否“平稳”——即价差是否围绕常数波动。

1.1 协整的数学定义

若序列 $X_t$ 和 $Y_t$ 都是一阶单整(I(1)),但存在一个 $\beta$ 使得:

$$Z_t = X_t - \beta Y_t \sim I(0)$$

则称 $X_t$ 和 $Y_t$ 是协整的,$\beta$ 为协整向量。

直观理解:虽然两只股票都随机游走,但它们之间的价差是均值回归的

这意味着:

  • 当价差偏离均值时,做回归交易有统计优势
  • $\beta$(对冲比率)是配对交易的核心参数
  • 价差的均值和标准差决定了仓位大小和止损阈值

二、协整检验:从 ADF 到 Johansen

2.1 Engle-Granger 两步法

最直观的协整检验方法:

第一步:OLS 回归 $X_t = \alpha + \beta Y_t + \epsilon_t$,得到残差 $\epsilon_t$

第二步:对残差序列做 ADF 检验(Augmented Dickey-Fuller)

import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint

def engle_granger_test(series1: np.ndarray, series2: np.ndarray, maxlag: int = 12) -> dict:
    """
    Engle-Granger 两步法协整检验
    
    参数:
        series1: 第一个时间序列
        series2: 第二个时间序列
        maxlag: ADF 检验的最大滞后阶数
    
    返回:
        包含检验统计量、p 值、对冲比率等关键指标
    """
    # 第一步:OLS 回归
    X = np.column_stack([np.ones(len(series2)), series2])
    beta, alpha = np.linalg.lstsq(X, series1, rcond=None)[0]
    
    # 计算价差(残差)
    spread = series1 - beta * series2 - alpha
    
    # 第二步:ADF 检验
    adf_result = adfuller(spread, maxlag=maxlag, autolag='AIC')
    
    # Engle-Granger 检验统计量(等同于 ADF)
    test_statistic = adf_result[0]
    p_value = adf_result[1]
    used_lag = adf_result[2]
    critical_values = adf_result[4]
    
    return {
        'hedge_ratio': beta,
        'intercept': alpha,
        'spread_mean': np.mean(spread),
        'spread_std': np.std(spread),
        'adf_statistic': test_statistic,
        'p_value': p_value,
        'used_lag': used_lag,
        'critical_values': critical_values,
        'is_cointegrated': p_value < 0.05  # 5% 显著性水平
    }

结果解读

指标 含义 阈值建议
p 值 拒绝“不协整”假设的概率 < 0.05 为协整
ADF 统计量 平稳性检验量 < 临界值(通常 -3.4)
hedge_ratio 每单位 Y 需要多少 X 配对交易的核心参数

2.2 Johansen 检验:多阶协整向量

Engle-Granger 只能检验一对一的协整关系,且结果受谁做被解释变量的影响。Johansen 检验基于向量自回归(VAR)框架,同时估计多个协整向量。

from statsmodels.tsa.vector_ar.vecm import coint_johansen

def johansen_test(data: pd.DataFrame, det_order: int = 0, k_ar_diff: int = 1) -> dict:
    """
    Johansen 协整检验
    
    参数:
        data: 多列 DataFrame,每列为一个时间序列
        det_order: 确定性项阶数(0=无,1=有截距,-1=无截距无趋势)
        k_ar_diff: VAR 模型的滞后阶数
    
    返回:
        协整秩、最多协整关系数、特征值统计量
    """
    result = coint_johansen(data, det_order, k_ar_diff)
    
    # 迹统计量(trace statistic)
    trace_stat = result.lr1
    trace_crit = result.cvt  # 临界值
    
    # 最大特征值统计量(max eigenvalue)
    max_eig_stat = result.lr2
    max_eig_crit = result.cvm
    
    # 确定协整秩
    rank = 0
    for i in range(len(trace_stat)):
        if trace_stat[i] > trace_crit[i, 1]:  # 5% 临界值
            rank = i + 1
    
    return {
        'trace_statistic': trace_stat,
        'trace_critical_values_95': trace_crit[:, 1],
        'max_eigen_statistic': max_eig_stat,
        'max_eigen_critical_values_95': max_eig_crit[:, 1],
        'eigenvalues': result.eig,
        'evec': result.evec,  # 协整向量矩阵
        'cointegration_rank': rank,
        'is_cointegrated': rank > 0
    }

2.3 半衰期:均值回归的速度

协整只告诉你价差“会回归”,不告诉你“多快回归”。半衰期(half-life)是均值回归策略的核心参数——它决定了持仓周期和止损设置。

Ornstein-Uhlenbeck 过程假设价差服从均值回复:

$$dZ_t = \kappa (\mu - Z_t) dt + \sigma dW_t$$

半衰期计算公式:

$$\text{half-life} = \frac{\ln(2)}{\kappa}$$

其中 $\kappa$ 通过 OLS 回归估计:

def half_life(spread: np.ndarray) -> float:
    """
    计算价差均值回归半衰期
    
    基于 OU 过程的离散形式:
    Δspread_t = α + β * spread_{t-1} + ε_t
    其中 κ = -β, θ = -α/β
    """
    spread_lag = spread[:-1]
    spread_diff = spread[1:] - spread_lag
    
    # OLS: Δspread = α + β * spread_{t-1}
    X = np.column_stack([np.ones(len(spread_lag)), spread_lag])
    beta = np.linalg.lstsq(X, spread_diff, rcond=None)[0]
    
    kappa = -beta[1]  # 均值回复速度
    
    if kappa <= 0:
        return np.inf  # 不收敛
    
    half_life = np.log(2) / kappa
    
    return half_life

# 示例:模拟 OU 过程并计算半衰期
np.random.seed(42)
T = 500
dt = 1/252
theta = 0.0  # 均值
mu = 100.0   # 长期均值
kappa = 0.5  # 均值回复速度
sigma = 0.02

spread_ou = [100.0]
for _ in range(T - 1):
    dW = np.random.normal(0, np.sqrt(dt))
    z = spread_ou[-1]
    dz = kappa * (mu - z) * dt + sigma * dW
    spread_ou.append(z + dz)

spread_ou = np.array(spread_ou)
estimated_half_life = half_life(spread_ou)

print(f"理论半衰期: {np.log(2)/kappa:.2f} 天")
print(f"估计半衰期: {estimated_half_life:.2f} 天")

阈值建议

半衰期 含义 策略建议
< 5 天 快速回归,适合日内/短线 高频、窄止损
5-20 天 中等回归速度 日线级别,仓位可偏重
> 20 天 回归缓慢,仓位需轻 考虑信号有效期,超时强制平仓

三、卡尔曼滤波:动态 hedge ratio

静态协整检验的问题是:hedge ratio 是固定的

现实市场中,股票间的相对关系随时间漂移。例如:

  • 能源股与油价的关系会因业务结构变化而改变
  • 两只 ETF 的成分权重会定期调整
  • 行业轮动导致板块内股票的联动性变化

卡尔曼滤波通过递归贝叶斯估计,为每个时间点计算最优的动态 hedge ratio

3.1 卡尔曼滤波原理

状态空间模型:

  • 状态方程:$\beta_t = \beta_{t-1} + w_t$(hedge ratio 随机游走)
  • 观测方程:$y_t = \beta_t x_t + v_t$(X 和 Y 的线性关系)

卡尔曼滤波器递归计算:

import numpy as np

class KalmanFilterPair:
    """
    卡尔曼滤波估计动态对冲比率
    
    状态: beta_t (hedge ratio)
    观测: y_t = beta_t * x_t + v_t
    """
    
    def __init__(self, delta: float = 1e-4, Ve: float = 1e-3):
        """
        参数:
            delta: 状态转移噪声的方差参数(控制 beta 的变化速度)
            Ve: 观测噪声方差
        """
        self.delta = delta
        self.Ve = Ve
        
        # 状态变量
        self.beta = 0.0      # 当前 hedge ratio 估计
        self.P = 1.0         # 估计误差方差
        self.Vw = delta / (1 - delta) * Ve  # 状态噪声方差
        
    def update(self, y: float, x: float) -> dict:
        """
        单步更新
        
        参数:
            y: 被对冲资产价格
            x: 对冲资产价格
        
        返回:
            当前估计的 hedge ratio 及相关统计量
        """
        # 预测步骤
        beta_pred = self.beta
        P_pred = self.P + self.Vw
        
        # 更新步骤
        y_pred = beta_pred * x
        residual = y - y_pred  # 观测残差(spread 的一部分)
        
        # 卡尔曼增益
        K = P_pred * x / (x * P_pred * x + self.Ve)
        
        # 状态更新
        self.beta = beta_pred + K * residual
        self.P = (1 - K * x) * P_pred
        
        # 计算标准化残差(z-score)
        z_score = residual / np.sqrt(self.Ve)
        
        return {
            'hedge_ratio': self.beta,
            'z_score': z_score,
            'spread': residual,
            'kalman_gain': K,
            'prediction_error_var': x * P_pred * x + self.Ve
        }
    
    def batch_update(self, y: np.ndarray, x: np.ndarray) -> pd.DataFrame:
        """
        批量更新(用于回测前预处理)
        """
        results = []
        for i in range(len(y)):
            result = self.update(y[i], x[i])
            result['t'] = i
            results.append(result)
        
        return pd.DataFrame(results)


# 示例:静态 vs 动态 hedge ratio
np.random.seed(42)
T = 500

# 模拟股票 A 和 B,beta 随时间变化
true_betas = np.concatenate([
    np.full(200, 1.0),      # 前期 beta = 1.0
    np.linspace(1.0, 2.0, 150),  # 中期 beta 漂移
    np.full(150, 2.0)       # 后期 beta = 2.0
])

x = np.cumsum(np.random.randn(T)) + 100  # 股票 B
epsilon = np.random.randn(T) * 0.5
y = true_betas * x + epsilon + 50        # 股票 A

# 卡尔曼滤波
kf = KalmanFilterPair(delta=1e-3, Ve=1e-3)
kalman_results = kf.batch_update(y, x)

# 静态 OLS(滚动窗口 50 天)
window = 50
static_betas = []
for i in range(window, T):
    y_window = y[i-window:i]
    x_window = x[i-window:i]
    beta = np.sum((x_window - np.mean(x_window)) * (y_window - np.mean(y_window))) / \
           np.sum((x_window - np.mean(x_window))**2)
    static_betas.append((i, beta))

print(f"卡尔曼滤波估计的最终 beta: {kalman_results['hedge_ratio'].iloc[-1]:.3f}")
print(f"真实 beta: {true_betas[-1]:.3f}")

关键优势

方法 优点 缺点
静态 OLS 计算简单,统计性质清晰 无法捕捉时变关系
滚动窗口 OLS 可适应变化,但窗口长度主观 有突变,不平滑
卡尔曼滤波 自适应、递归、平滑 参数调优复杂

四、生产级配对筛选系统

4.1 配对预筛选流程

从数千只股票中筛选配对,需要分层处理:

from itertools import combinations
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import numpy as np

def screen_pairs_candidates(
    prices: pd.DataFrame,
    max_pairs: int = 1000,
    corr_threshold: float = 0.7,
    adf_p_threshold: float = 0.05
) -> pd.DataFrame:
    """
    配对筛选主流程
    
    参数:
        prices: 价格 DataFrame,列为股票代码,行为日期
        max_pairs: 最大候选配对数量(避免计算爆炸)
        corr_threshold: 第一层相关性阈值
        adf_p_threshold: 第二层协整检验 p 值阈值
    """
    
    symbols = prices.columns.tolist()
    n = len(symbols)
    print(f"候选股票数量: {n},理论配对数: {n*(n-1)//2}")
    
    # ========== 第一层:相关性初筛 ==========
    returns = prices.pct_change().dropna()
    corr_matrix = returns.corr()
    
    high_corr_pairs = []
    for i in range(len(symbols)):
        for j in range(i+1, len(symbols)):
            if corr_matrix.iloc[i, j] > corr_threshold:
                high_corr_pairs.append((symbols[i], symbols[j], corr_matrix.iloc[i, j]))
    
    print(f"高相关性配对数(|r|>{corr_threshold}): {len(high_corr_pairs)}")
    
    # ========== 第二层:协整检验 ==========
    coint_results = []
    for sym_a, sym_b, corr in high_corr_pairs:
        series_a = prices[sym_a].values
        series_b = prices[sym_b].values
        
        # Engle-Granger 检验
        result = engle_granger_test(series_a, series_b)
        
        if result['is_cointegrated'] and result['p_value'] < adf_p_threshold:
            # 计算半衰期
            spread = series_a - result['hedge_ratio'] * series_b - result['intercept']
            hl = half_life(spread)
            
            coint_results.append({
                'symbol_a': sym_a,
                'symbol_b': sym_b,
                'correlation': corr,
                'hedge_ratio': result['hedge_ratio'],
                'adf_p_value': result['p_value'],
                'half_life': hl,
                'spread_mean': result['spread_mean'],
                'spread_std': result['spread_std']
            })
    
    # 按 p 值排序,取前 max_pairs
    coint_df = pd.DataFrame(coint_results)
    if len(coint_df) > 0:
        coint_df = coint_df.sort_values('adf_p_value').head(max_pairs)
    
    print(f"协整配对数(p<{adf_p_threshold}): {len(coint_df)}")
    
    return coint_df

4.2 获取 TickDB 历史 K 线数据

以下是调用 TickDB 获取股票历史数据的生产级代码示例,包含完整的心跳保活、指数退避重连、限频处理等机制:

import os
import time
import json
import asyncio
import aiohttp
import pandas as pd
from typing import Optional, List

# ========== 配置区 ==========
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY", "your-api-key-here")
BASE_URL = "https://api.tickdb.ai/v1"


class TickDBClient:
    """
    TickDB API 客户端 - 生产级
    
    特性:
    - 指数退避重连 + 抖动
    - 限频自适应处理 (code:3001)
    - 超时设置
    - 心跳保活
    """
    
    def __init__(self, api_key: str, base_url: str = BASE_URL):
        self.api_key = api_key
        self.base_url = base_url
        self.session: Optional[aiohttp.ClientSession] = None
        
        # 限频状态
        self.request_count = 0
        self.last_reset = time.time()
        self.rate_limit_delay = 1.0  # 默认 1 秒
        
        # 重连参数
        self.max_retries = 5
        self.base_delay = 1.0
        self.max_delay = 60.0
        
    async def __aenter__(self):
        timeout = aiohttp.ClientTimeout(total=30, connect=10)
        self.session = aiohttp.ClientSession(
            headers={
                "X-API-Key": self.api_key,
                "Content-Type": "application/json"
            },
            timeout=timeout
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def _request_with_retry(
        self,
        method: str,
        endpoint: str,
        params: Optional[dict] = None,
        data: Optional[dict] = None
    ) -> dict:
        """
        带重试机制的请求
        
        ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
        """
        for retry in range(self.max_retries):
            try:
                async with self.session.request(
                    method,
                    f"{self.base_url}{endpoint}",
                    params=params,
                    json=data
                ) as response:
                    result = await response.json()
                    
                    # ⚠️ 限频处理 (code:3001)
                    if result.get("code") == 3001:
                        retry_after = int(response.headers.get("Retry-After", 5))
                        wait_time = retry_after + random.uniform(0, 1)  # 抖动
                        print(f"[RateLimit] 等待 {wait_time:.1f}s")
                        await asyncio.sleep(wait_time)
                        continue
                    
                    # 其他错误
                    if result.get("code") != 0:
                        error_msg = result.get("message", "Unknown error")
                        raise RuntimeError(f"API Error {result.get('code')}: {error_msg}")
                    
                    return result.get("data", {})
                    
            except aiohttp.ClientError as e:
                delay = min(self.base_delay * (2 ** retry), self.max_delay)
                jitter = random.uniform(0, delay * 0.1)
                wait_time = delay + jitter
                
                print(f"[ConnectionError] 重试 {retry+1}/{self.max_retries},"
                      f"等待 {wait_time:.1f}s - {type(e).__name__}")
                await asyncio.sleep(wait_time)
                
            except asyncio.TimeoutError:
                delay = min(self.base_delay * (2 ** retry), self.max_delay)
                print(f"[Timeout] 重试 {retry+1}/{self.max_retries},等待 {delay:.1f}s")
                await asyncio.sleep(delay)
        
        raise RuntimeError(f"重试 {self.max_retries} 次后仍失败")
    
    async def get_kline(
        self,
        symbol: str,
        interval: str = "1d",
        limit: int = 1000,
        start_time: Optional[int] = None,
        end_time: Optional[int] = None
    ) -> pd.DataFrame:
        """
        获取 K 线数据
        
        参数:
            symbol: 交易品种,如 "AAPL.US"
            interval: K 线周期,"1m", "5m", "1h", "1d"
            limit: 返回数据条数,最大 1000
            start_time: 开始时间(Unix 时间戳,毫秒)
            end_time: 结束时间(Unix 时间戳,毫秒)
        
        ⚠️ 注意:
        - 获取当前 K 线用 /kline/latest
        - 获取历史 K 线用 /kline
        """
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        }
        if start_time:
            params["start"] = start_time
        if end_time:
            params["end"] = end_time
        
        data = await self._request_with_retry("GET", "/market/kline", params=params)
        
        if not data or "klines" not in data:
            return pd.DataFrame()
        
        df = pd.DataFrame(data["klines"])
        if len(df) > 0:
            df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
            df.set_index('timestamp', inplace=True)
        
        return df
    
    async def get_available_symbols(self, market: Optional[str] = None) -> List[str]:
        """获取可用的交易品种列表"""
        params = {}
        if market:
            params["market"] = market
        
        data = await self._request_with_retry("GET", "/symbols/available", params=params)
        
        if not data or "symbols" not in data:
            return []
        
        return data["symbols"]


# ========== 使用示例 ==========
async def fetch_pair_prices(symbol_a: str, symbol_b: str, days: int = 500) -> pd.DataFrame:
    """
    获取配对交易所需的历史价格数据
    """
    async with TickDBClient(TICKDB_API_KEY) as client:
        # 计算时间范围
        end_time = int(time.time() * 1000)
        start_time = int((time.time() - days * 86400) * 1000)
        
        # 并行获取两只股票的数据
        tasks = [
            client.get_kline(symbol_a, interval="1d", start_time=start_time, end_time=end_time),
            client.get_kline(symbol_b, interval="1d", start_time=start_time, end_time=end_time)
        ]
        
        results = await asyncio.gather(*tasks)
        
        if len(results[0]) == 0 or len(results[1]) == 0:
            raise ValueError(f"无法获取数据: {symbol_a} 或 {symbol_b}")
        
        # 合并数据,按日期对齐
        prices = pd.DataFrame({
            symbol_a: results[0]['c'],  # 收盘价
            symbol_b: results[1]['c]
        })
        
        return prices.dropna()


# 示例调用
# asyncio.run(fetch_pair_prices("AAPL.US", "MSFT.US"))

4.3 完整回测框架

import pandas as pd
import numpy as np
from scipy import stats

class PairTradingBacktest:
    """
    配对交易回测框架
    
    策略逻辑:
    - 使用卡尔曼滤波估计动态 hedge ratio
    - spread 的 z-score 触发交易信号
    - 固定半衰期作为持仓周期参考
    """
    
    def __init__(
        self,
        entry_threshold: float = 2.0,
        exit_threshold: float = 0.5,
        max_holding_days: int = 20,
        transaction_cost: float = 0.0005
    ):
        self.entry_threshold = entry_threshold
        self.exit_threshold = exit_threshold
        self.max_holding_days = max_holding_days
        self.transaction_cost = transaction_cost
        
    def run(
        self,
        prices_a: pd.Series,
        prices_b: pd.Series,
        hedge_ratios: pd.Series
    ) -> dict:
        """
        运行回测
        
        参数:
            prices_a: 资产 A 的价格序列
            prices_b: 资产 B 的价格序列
            hedge_ratios: 卡尔曼滤波估计的动态 hedge ratio
        
        ⚠️ 此处使用预计算的 hedge ratio,实际使用中可实时更新
        """
        # 计算 spread
        spread = prices_a.values - hedge_ratios.values * prices_b.values
        
        # 计算 z-score(滚动 20 日)
        z_window = 20
        z_score = pd.Series(spread).rolling(z_window).apply(
            lambda x: (x[-1] - np.mean(x)) / np.std(x) if np.std(x) > 0 else 0
        ).values
        
        # 交易模拟
        positions = np.zeros(len(prices_a))
        entry_date = None
        
        for i in range(z_window, len(z_score)):
            z = z_score[i]
            
            if positions[i-1] == 0:  # 无持仓
                if z > self.entry_threshold:
                    positions[i] = -1  # 做空 spread(做空 A,做多 B)
                    entry_date = i
                elif z < -self.entry_threshold:
                    positions[i] = 1   # 做多 spread(做多 A,做空 B)
                    entry_date = i
            else:  # 持仓中
                # 止盈/止损
                if abs(z) < self.exit_threshold:
                    positions[i] = 0
                    entry_date = None
                # 超时强制平仓
                elif entry_date is not None and i - entry_date >= self.max_holding_days:
                    positions[i] = 0
                    entry_date = None
                else:
                    positions[i] = positions[i-1]
        
        # 计算收益
        returns_a = np.diff(prices_a.values) / prices_a.values[:-1]
        returns_b = np.diff(prices_b.values) / prices_b.values[:-1]
        
        # 策略收益(考虑交易成本)
        position_shifted = np.roll(positions[:-1], 1)
        trades = np.abs(np.diff(positions))
        
        strategy_returns = (
            position_shifted * (returns_a - hedge_ratios[:-1] * returns_b) 
            - trades * self.transaction_cost
        )
        
        # 统计指标
        cum_returns = np.cumprod(1 + strategy_returns)
        
        return {
            'total_return': cum_returns[-1] - 1 if len(cum_returns) > 0 else 0,
            'sharpe_ratio': np.mean(strategy_returns) / np.std(strategy_returns) * np.sqrt(252) if np.std(strategy_returns) > 0 else 0,
            'max_drawdown': self._max_drawdown(cum_returns),
            'win_rate': np.sum(strategy_returns > 0) / np.sum(np.abs(trades) > 0) if np.sum(np.abs(trades) > 0) > 0 else 0,
            'total_trades': int(np.sum(trades) / 2),
            'cum_returns': cum_returns,
            'positions': positions,
            'z_score': z_score
        }
    
    def _max_drawdown(self, cum_returns: np.ndarray) -> float:
        peak = np.maximum.accumulate(cum_returns)
        drawdown = (cum_returns - peak) / peak
        return np.min(drawdown)


# ========== 回测示例 ==========
def run_pair_backtest_example():
    """
    完整回测示例
    
    ⚠️ 以下为模拟数据演示,实际使用需要从 TickDB 获取真实历史数据
    """
    np.random.seed(42)
    T = 500
    
    # 模拟协整的价格序列
    true_beta = 1.5
    x = np.cumsum(np.random.randn(T) * 0.02) + 100  # 股票 B
    noise = np.random.randn(T) * 0.5
    y = true_beta * x + noise + 20  # 股票 A
    
    # 卡尔曼滤波估计动态 beta
    kf = KalmanFilterPair(delta=1e-4, Ve=1e-3)
    kalman_results = kf.batch_update(y, x)
    dynamic_betas = kalman_results['hedge_ratio'].values
    
    # 回测
    bt = PairTradingBacktest(
        entry_threshold=2.0,
        exit_threshold=0.5,
        max_holding_days=20,
        transaction_cost=0.0005
    )
    
    result = bt.run(
        prices_a=pd.Series(y),
        prices_b=pd.Series(x),
        hedge_ratios=pd.Series(dynamic_betas)
    )
    
    print("=" * 50)
    print("配对交易回测结果(模拟数据)")
    print("=" * 50)
    print(f"总收益率: {result['total_return']*100:.2f}%")
    print(f"年化夏普比率: {result['sharpe_ratio']:.2f}")
    print(f"最大回撤: {result['max_drawdown']*100:.2f}%")
    print(f"胜率: {result['win_rate']*100:.1f}%")
    print(f"总交易次数: {result['total_trades']}")
    print("=" * 50)
    
    return result


# run_pair_backtest_example()

五、实战参数配置建议

5.1 配对筛选标准

指标 严格标准 宽松标准 说明
ADF p 值 < 0.01 < 0.05 协整统计显著性
半衰期 5-15 天 3-30 天 持仓周期参考
相关性 > 0.8 > 0.6 方向一致性
最小数据量 250 天 120 天 统计稳健性

5.2 卡尔曼滤波参数

场景 delta Ve 适用情况
稳健型 1e-5 1e-4 长持仓、高波动资产
均衡型 1e-4 1e-3 默认推荐
激进型 1e-3 1e-2 快速变化的联动关系

六、配对筛选与 TickDB 数据

配对筛选策略的有效性高度依赖高质量的历史价格数据。以下是 TickDB 在配对交易场景中的关键能力:

能力维度 适用场景
10 年级别美股历史 K 线 构建长期协整关系数据库
跨资产统一接入 股票、ETF、数字货币间的跨市场配对
实时 depth 频道 盘中发现微观结构异常,寻找新的配对机会
高可用 WebSocket 连接稳定,支持长时间策略运行

使用建议

  1. 历史数据获取:使用 /v1/market/kline 接口,设置 interval="1d" 和较大的 limit
  2. 实时监控:当现有配对的 z-score 突破阈值时,通过 WebSocket 实时推送告警
  3. 回测验证:至少使用 3 年历史数据,覆盖不同市场环境

七、结语

配对筛选的本质是用统计方法过滤噪音、锁定高概率机会

协整检验告诉你“这个配对值得关注”,卡尔曼滤波告诉你“这个时刻该用多少仓位对冲”。两个工具配合,才能构建统计上稳健的配对策略。

但数字永远无法替代交易员的判断力。2010 年 Flash Crash 期间,无数看似协整的配对在 5 分钟内价差崩溃 20 倍。极端行情下,价差可能永远不会回归

这也是为什么参数选择永远需要在“统计优势”和“生存能力”之间权衡。


下一步行动

如果你想亲手实现配对筛选系统

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,运行本文代码获取历史数据

如果你需要多标的批量筛选

# 并行获取多只股票数据(示例)
python3 -c "
import asyncio, os
from pair_screening import fetch_pair_prices

symbols = ['AAPL.US', 'MSFT.US', 'GOOGL.US', 'AMZN.US', 'META.US']
pairs = [(symbols[i], symbols[j]) for i in range(len(symbols)) for j in range(i+1, len(symbols))]
print(f'候选配对数: {len(pairs)}')
"

如果你对参数调优有疑问,参考本文第五章配置建议,或联系 TickDB 技术支持获取行业最佳实践。


回测局限性说明:上述回测结果基于模拟数据或有限历史区间,不构成未来收益保证。回测中存在以下局限性:未完全模拟实际交易中的滑点和市场冲击成本(已假设 0.05% 固定滑点);未考虑极端行情下的流动性枯竭风险;样本量有限,统计显著性可能不足。配对交易的协整关系可能随时间失效,建议定期重新检验。


本文数据支持:TickDB