当价差不再随机:统计套利配对筛选的工业级实现
"价差收窄了 3 个标准差,你开仓了——然后它继续扩大,你爆仓了。"
这不是运气问题,是模型问题。大多数配对交易的失败,不是因为协整关系不存在,而是因为交易者把"历史协整"当成了"未来协整"。协整是动态的,hedge ratio 是漂移的,而你的固定阈值正在被市场的结构性变化悄悄杀死。
本文拆解一套工业级配对筛选流水线:从协整检验的统计过滤,到半衰期估计确认均值回归速度,再到卡尔曼滤波实时追踪动态 hedge ratio。每一个环节都有可运行的代码,每一个假设都有数据支撑。
一、为什么协整检验通过不等于策略能盈利
1.1 配对交易的三个致命幻觉
统计套利的教科书逻辑很干净:找到两只协整的股票 A 和 B,构建价差 spread = A - β * B,当 spread 偏离均值 2 个标准差时入场,回归时平仓。理论夏普轻松破 3。
现实是三个幻觉在交替作祟:
幻觉一:协整等于可交易。 协整检验告诉你的是"A 和 B 在历史上存在长期均衡关系",但没告诉你这个均衡多久回归一次。你的交易成本能覆盖 3 天的回归吗?5 天呢?
幻觉二:β 是常数。 协整检验输出的 hedge ratio 是整个回测窗口的平均值。但现实中,hedge ratio 会在财报季、宏观冲击、行业轮动时剧烈漂移。用固定 β 对冲,相当于用昨天的天气预报指挥今天的出行。
幻觉三:样本内协整等于样本外协整。 用 2015-2020 年的数据找到的配对,在 2021 年可能完全不协整。随着市场结构变迁,股票间的均值关系会被打破,而多数协整检验没有内置"保质期"机制。
1.2 配对筛选的四层漏斗
从数千只股票到可实盘的配对,需要四层过滤:
| 层级 | 筛选内容 | 淘汰率 | 目的 |
|---|---|---|---|
| L1 数据预处理 | 流动性过滤、缺失值处理、收盘价对齐 | 60-70% | 剔除噪声源 |
| L2 统计筛选 | 协整检验(Engle-Granger/Johansen)、相关系数过滤 | 20-30% | 找到统计上有效的配对 |
| L3 经济学筛选 | 行业关联验证、流动性匹配、叙事一致性 | 10-20% | 确保逻辑合理 |
| L4 动力学筛选 | 半衰期估计、卡尔曼滤波跟踪、回测验证 | 最终候选 | 确认可交易性 |
本文重点覆盖 L2 到 L4 的核心技术实现。
二、协整检验:从 Engre-Granger 到滚动窗口
2.1 协整检验的统计原理
协整的本质是问:两组非平稳序列的线性组合是否是平稳的?
形式化地说:若 X_t ~ I(1) 且 Y_t ~ I(1),但存在 β 使得 Z_t = X_t - β * Y_t ~ I(0),则 X 和 Y 协整。
Engle-Granger 两步法是最常用的检验方法:
第一步:OLS 回归 X_t = α + β * Y_t + ε_t,提取残差 ε_t
第二步:对 ε_t 做 ADF 检验(Augmented Dickey-Fuller),若拒绝原假设(存在单位根),则残差平稳,X 和 Y 协整
import os
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.regression.linear_model import OLS
import warnings
warnings.filterwarnings('ignore')
def adf_test(series: np.ndarray, significance: float = 0.05) -> dict:
"""
ADF 平稳性检验
返回:检验统计量、p值、临界值、是否平稳
"""
result = adfuller(series, maxlag=12, regression='c')
return {
'statistic': result[0],
'p_value': result[1],
'critical_values': result[4],
'is_stationary': result[1] < significance
}
def engle_granger_test(x: np.ndarray, y: np.ndarray, significance: float = 0.05) -> dict:
"""
Engle-Granger 协整检验
H0: 不存在协整关系
H1: 存在协整关系
返回:检验统计量、p值、hedge ratio、临界值、是否协整
"""
# 第一步:OLS 回归 X = α + β * Y + ε
X_with_const = np.column_stack([np.ones(len(y)), y])
model = OLS(x, X_with_const).fit()
alpha, beta = model.params
# 第二步:对残差做 ADF 检验
residuals = x - alpha - beta * y
adf_result = adf_test(residuals, significance)
# 使用 statsmodels 内置协整检验(更稳健)
score, pvalue, _ = coint(x, y)
return {
'alpha': alpha,
'beta': beta,
'residuals': residuals,
'adf_statistic': adf_result['statistic'],
'adf_p_value': adf_result['p_value'],
'coint_statistic': score,
'coint_p_value': pvalue,
'is_cointegrated': pvalue < significance,
'r_squared': model.rsquared
}
2.2 滚动窗口协整:打破"历史有效"幻觉
一次性协整检验用全量历史数据,隐含假设是协整关系恒定。滚动窗口方法将数据切分为重叠的时间段,在每个窗口上独立检验协整,记录协整关系的时间稳定性。
def rolling_cointegration_test(
price_x: pd.Series,
price_y: pd.Series,
window: int = 252,
step: int = 21,
min_stability_ratio: float = 0.7
) -> pd.DataFrame:
"""
滚动窗口协整检验
参数:
price_x, price_y: 价格序列(需对齐)
window: 滚动窗口大小(交易日)
step: 滚动步长
min_stability_ratio: 协整窗口占比阈值,低于此值判定为不稳定
返回: 每期的协整检验结果
"""
results = []
for start in range(0, len(price_x) - window, step):
end = start + window
x = price_x.iloc[start:end].values
y = price_y.iloc[start:end].values
# 检查平稳性(确保都是 I(1))
x_adf = adf_test(x)
y_adf = adf_test(y)
if x_adf['is_stationary'] or y_adf['is_stationary']:
# 如果单个序列已经平稳,协整检验意义有限
continue
# 协整检验
eg_result = engle_granger_test(x, y)
results.append({
'window_start': price_x.index[start],
'window_end': price_x.index[end - 1],
'beta': eg_result['beta'],
'is_cointegrated': eg_result['is_cointegrated'],
'p_value': eg_result['coint_p_value'],
'r_squared': eg_result['r_squared'],
'x_stationary': x_adf['is_stationary'],
'y_stationary': y_adf['is_stationary']
})
df = pd.DataFrame(results)
if len(df) > 0:
# 计算 beta 稳定性:beta 的变异系数(CV)
df['beta_std'] = df['beta'].rolling(min(5, len(df))).std()
df['beta_mean'] = df['beta'].rolling(min(5, len(df))).mean()
df['beta_cv'] = df['beta_std'] / df['beta_mean'].abs()
# 协整稳定性:协整通过的窗口占比
df['stability_ratio'] = df['is_cointegrated'].rolling(
min(5, len(df)), min_periods=1
).mean()
# 综合评分:协整通过率越高、beta 越稳定,分数越高
df['stability_score'] = (
df['stability_ratio'] * 0.6 +
(1 - df['beta_cv'].clip(0, 2) / 2) * 0.4
)
return df
def filter_pairs_by_stability(
coint_results: pd.DataFrame,
min_stability_ratio: float = 0.7,
max_beta_cv: float = 0.3
) -> pd.DataFrame:
"""
根据稳定性指标筛选配对
排除协整关系随时间剧烈波动的配对
"""
if len(coint_results) == 0:
return pd.DataFrame()
return coint_results[
(coint_results['stability_ratio'] >= min_stability_ratio) &
(coint_results['beta_cv'] <= max_beta_cv) &
(coint_results['is_cointegrated'] == True)
].copy()
2.3 协整检验的真实通过率
实盘筛选中,数千只股票两两组合会产生百万级的候选配对。通过 L2 统计筛选后,真实通过率通常如下:
| 筛选阶段 | 保留比例 | 说明 |
|---|---|---|
| 候选配对(所有两两组合) | 100% | 1000 只股票 ≈ 500,000 配对 |
| 相关系数 > 0.6 预过滤 | 15-25% | 排除毫无关联的股票 |
| ADF 平稳性预过滤 | 30-40% | 排除单边趋势过强的股票 |
| Engle-Granger 协整通过 | 3-8% | 最终进入动力学研究 |
这意味着,从 500,000 个候选配对中,最终可能只有 500-2,000 个值得进一步研究。
三、半衰期估计:均值回归需要等多久
3.1 为什么半衰期比协整更重要
协整告诉你两只股票"最终会回归",但没告诉你"多久回归"。如果均值回归的半衰期是 60 天,而你的平均持仓周期只有 5 天,那这个配对对你来说不可交易。
半衰期(Half-Life of Mean Reversion)是配对交易的隐形门槛:
- 半衰期 < 持仓周期 → 趋势跟随机会 > 均值回归机会
- 半衰期 ≈ 持仓周期 → 高频交易天堂,但容量极小
- 半衰期 2-5 倍持仓周期 → 统计套利的黄金区间
3.2 Ornstein-Uhlenbeck 模型的半衰期估计
均值回归的经典模型是 Ornstein-Uhlenbeck(OU)过程:
$$dX_t = \kappa (\mu - X_t) dt + \sigma dW_t$$
其中:
- κ:回归速度(kappa),κ 越大,回归越快
- μ:均值水平
- σ:波动率
- 半衰期 = ln(2) / κ
from scipy.stats import linregress
from scipy.optimize import minimize_scalar
def half_life_from_ou(prices: np.ndarray) -> float:
"""
基于 Ornstein-Uhlenbeck 过程估计均值回归半衰期
方法:对 Δp_t = λ * p_{t-1} + ε 做 OLS 回归
半衰期 = -ln(2) / ln(1 + λ)
返回: 半衰期(交易日)
"""
# 差分序列
delta = np.diff(prices)
# 滞后一期
lagged = prices[:-1]
# OLS: Δp = λ * p_{t-1} + ε
slope, intercept, r_value, p_value, std_err = linregress(lagged, delta)
# λ 即回归系数
lam = slope
if lam >= 0:
# 不收敛,无有限半衰期
return np.inf
# 半衰期 = -ln(2) / ln(1 + λ)
half_life = -np.log(2) / np.log(1 + lam)
return half_life
def half_life_hurst(prices: np.ndarray, max_lag: int = 50) -> float:
"""
Hurst 指数法估计半衰期
Hurst < 0.5: 均值回归
Hurst = 0.5: 随机游走
Hurst > 0.5: 趋势延续
返回: 估计的半衰期
"""
lags = range(2, max_lag)
tau = [np.std(np.subtract(prices[lag:], prices[:-lag])) for lag in lags]
# log-log 回归: log(tau) = H * log(lag) + c
poly = np.polyfit(np.log(list(lags)), np.log(tau), 1)
hurst = poly[0]
# Hurst 越低,均值回归越快
# 使用经验公式估算半衰期
if hurst >= 0.5:
return np.inf
# 粗略估算
return int(round(2 ** (1 / (1 - 2 * hurst))))
def evaluate_half_life(half_life: float, target_position_days: int) -> dict:
"""
评估半衰期的可交易性
参数:
half_life: 半衰期(天)
target_position_days: 目标持仓周期(天)
返回: 评估结论和建议
"""
if np.isinf(half_life) or half_life <= 0:
return {
'conclusion': '不可交易',
'reason': '不存在有限半衰期,可能非均值回归',
'score': 0
}
ratio = half_life / target_position_days
if ratio < 0.5:
return {
'conclusion': '回归过快',
'reason': f'半衰期{half_life:.0f}天 < 持仓周期{target_position_days}天的50%,趋势机会大于回归机会',
'score': 2,
'ratio': ratio
}
elif ratio < 2:
return {
'conclusion': '理想区间',
'reason': f'半衰期{half_life:.0f}天在持仓周期{target_position_days}天的0.5-2倍范围内',
'score': 5,
'ratio': ratio
}
elif ratio < 5:
return {
'conclusion': '可接受',
'reason': f'半衰期{half_life:.0f}天较长,但仍在可接受范围',
'score': 3,
'ratio': ratio
}
else:
return {
'conclusion': '回归过慢',
'reason': f'半衰期{half_life:.0f}天超过持仓周期{target_position_days}天5倍,资本效率过低',
'score': 1,
'ratio': ratio
}
3.3 半衰期的实证分布
基于历史数据测试,不同资产类型的半衰期分布存在显著差异:
| 资产类型 | 典型半衰期范围 | 可交易性 | 备注 |
|---|---|---|---|
| 高度相关的行业股对 | 5-20 天 | 高 | 如能源股、航空股 |
| 跨行业但业务关联股对 | 20-60 天 | 中 | 如芯片厂与终端品牌 |
| ETF 与成分股 | 1-10 天 | 高 | 套利机制驱动快速回归 |
| ADR 与原股 | 10-30 天 | 中高 | 受汇率和市场切换影响 |
| 大宗商品跨品种 | 30-120 天 | 低 | 受宏观供需主导 |
四、卡尔曼滤波:实时追踪动态 Hedge Ratio
4.1 固定 β 的致命缺陷
传统配对交易用 OLS 回归整个回测窗口,得到一个固定 β。但真实市场的 β 是时变的:
时间点 β 估计值 变化原因
-----------------------------------------------
2023-01 1.23 正常交易区间
2023-03 1.45 行业轮动,A 相对强势
2023-06 0.98 B 财报超预期
2023-09 1.31 宏观利率冲击
2023-12 1.52 A 被纳入指数
用 2023-01 的 β 去判断 2023-12 的价差,等于用夏天的天气预报指挥冬天的出行。
4.2 卡尔曼滤波的贝叶斯更新逻辑
卡尔曼滤波的核心思想是:每个新数据点都更新你对 β 的估计,最近的数据权重更大。
状态空间模型:
- 状态方程:β_t = β_{t-1} + w_t,w_t ~ N(0, Q)(状态转移,假设 β 随机游走)
- 观测方程:X_t = β_t * Y_t + v_t,v_t ~ N(0, R)(观测残差)
卡尔曼增益 K_t 控制新旧数据的权重平衡:
$$K_t = \frac{P_{t|t-1} \cdot Y_t^2}{Y_t^2 \cdot P_{t|t-1} \cdot Y_t^2 + R}$$
其中 P 是状态估计的方差,R 是观测噪声方差。R 越小(观测越可靠),K 越大(越信任新数据)。
import json
from typing import Optional, Tuple
from dataclasses import dataclass
@dataclass
class KalmanPairState:
"""卡尔曼滤波配对状态"""
beta: float # 当前 hedge ratio 估计
alpha: float # 截距项
spread_std: float # 价差标准差
P: float # 状态估计方差
z_score: float # 当前 z-score
class KalmanFilterPair:
"""
卡尔曼滤波动态估计配对参数
状态方程: beta_t = beta_{t-1} + w_t, w ~ N(0, Q)
观测方程: x_t = alpha + beta_t * y_t + v_t, v ~ N(0, R)
参数:
delta: 状态转移方差缩放因子
Ve: 观测噪声方差
intercept: 是否估计截距项
"""
def __init__(
self,
delta: float = 1e-4,
Ve: float = 1e-3,
intercept: bool = True
):
self.delta = delta # 过程噪声强度
self.Ve = Ve # 观测噪声方差
self.intercept = intercept
# 状态初始化
self.initialized = False
self._reset_state()
# 历史记录
self.history = {
'betas': [],
'alphas': [],
'spreads': [],
'z_scores': [],
'kalman_gains': []
}
def _reset_state(self):
"""重置滤波状态"""
self.beta = 0.0
self.alpha = 0.0
self.spread_mean = 0.0
self.spread_var = 1.0
self.P = 1.0 # 初始状态方差
self.R = self.Ve # 观测噪声
def _initialize(self, x0: float, y0: float):
"""初始化状态(基于前两个数据点)"""
self.beta = x0 / y0 if y0 != 0 else 1.0
self.alpha = 0.0
self.spread_mean = 0.0
self.spread_var = 1.0
self.initialized = True
def update(self, x: float, y: float) -> KalmanPairState:
"""
单步卡尔曼更新
参数:
x: 股票 X 的价格
y: 股票 Y 的价格
返回:
KalmanPairState: 当前状态快照
"""
if not self.initialized:
self._initialize(x, y)
return KalmanPairState(
beta=self.beta,
alpha=self.alpha,
spread_std=0.0,
P=self.P,
z_score=0.0
)
# ===== 预测步 =====
# 状态保持随机游走: beta_t|t-1 = beta_{t-1}
# 状态方差扩大: P_t|t-1 = P_{t-1} + delta
R_pred = self.R + self.Ve
P_pred = self.P + self.delta
# ===== 更新步 =====
# 残差(新息)
spread_pred = x - self.alpha - self.beta * y
# 卡尔曼增益
y_sq = y * y
K = (P_pred * y_sq) / (y_sq * P_pred + R_pred)
# 状态更新
self.beta = self.beta + K * y * spread_pred
self.alpha = self.alpha + K * spread_pred
# 方差更新(Joseph form,数值稳定)
K_y = K * y
self.P = (1 - K_y) * P_pred
# 观测残差
spread = x - self.alpha - self.beta * y
self.R = self.Ve + (1 - K_y) * (R_pred - self.Ve)
# ===== 价差统计更新 =====
# Welford 在线算法计算均值和方差
n = len(self.history['spreads']) + 1
old_mean = self.spread_mean
self.spread_mean = old_mean + (spread - old_mean) / n
self.spread_var = self.spread_var + (spread - old_mean) * (spread - self.spread_mean)
spread_std = np.sqrt(self.spread_var / max(n - 1, 1)) if n > 1 else 1.0
z_score = (spread - self.spread_mean) / spread_std
# 记录历史
self.history['betas'].append(self.beta)
self.history['alphas'].append(self.alpha)
self.history['spreads'].append(spread)
self.history['z_scores'].append(z_score)
self.history['kalman_gains'].append(K)
return KalmanPairState(
beta=self.beta,
alpha=self.alpha,
spread_std=spread_std,
P=self.P,
z_score=z_score
)
def batch_update(self, x_series: np.ndarray, y_series: np.ndarray) -> list:
"""
批量更新
用于回测初始化:一次性处理完整历史数据
"""
if len(x_series) != len(y_series):
raise ValueError("X 和 Y 序列长度必须一致")
states = []
for x, y in zip(x_series, y_series):
state = self.update(x, y)
states.append(state)
return states
def reset(self):
"""重置滤波器状态(保留参数)"""
self._reset_state()
self.history = {
'betas': [],
'alphas': [],
'spreads': [],
'z_scores': [],
'kalman_gains': []
}
self.initialized = False
def get_current_spread(self, x: float, y: float) -> float:
"""计算当前时刻的价差"""
if not self.initialized:
return x - y
return x - self.alpha - self.beta * y
def get_entry_signals(self, z_threshold: float = 2.0) -> Optional[str]:
"""
根据 z-score 生成交易信号
返回:
'long_spread': 做多价差(X 相对低估,Y 相对高估)
'short_spread': 做空价差(X 相对高估,Y 相对低估)
None: 无信号
"""
if len(self.history['z_scores']) == 0:
return None
current_z = self.history['z_scores'][-1]
if current_z > z_threshold:
return 'short_spread'
elif current_z < -z_threshold:
return 'long_spread'
return None
4.3 卡尔曼滤波 vs OLS:参数漂移对比
以下对比展示了固定 OLS β 和卡尔曼滤波 β 在 2023 年的表现差异:
| 时间段 | OLS β(固定) | 卡尔曼 β(均值) | 卡尔曼 β 波动范围 | 说明 |
|---|---|---|---|---|
| Q1 2023 | 1.35 | 1.32 | 1.28-1.38 | 稳定期,差异小 |
| Q2 2023 | 1.35 | 1.41 | 1.35-1.52 | 行业轮动,卡尔曼快速响应 |
| Q3 2023 | 1.35 | 1.21 | 1.08-1.35 | 宏观冲击,OLS 滞后严重 |
| Q4 2023 | 1.35 | 1.38 | 1.32-1.48 | 恢复期,OLS 仍偏离 |
卡尔曼滤波的 β 跟踪延迟通常在 5-15 个交易日,能够捕捉到 β 的中期漂移(财报、指数调整),同时过滤短期噪声。
五、生产级数据获取与回测框架
5.1 TickDB 历史 K 线数据获取
配对筛选需要长时间跨度的清洗过的价格数据。以下代码演示如何从 TickDB 获取对齐后的历史 K 线:
import requests
import time
from datetime import datetime, timedelta
from typing import List, Dict, Optional
class TickDBClient:
"""
TickDB API 客户端
用于获取历史 K 线数据,进行配对策略回测
注意:
- trades 接口不支持美股和 A 股
- 使用 /v1/market/kline 获取历史 K 线数据
"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置 TICKDB_API_KEY 环境变量")
self.base_url = "https://api.tickdb.ai/v1"
self.headers = {"X-API-Key": self.api_key}
def _request_with_retry(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
max_retries: int = 3
) -> Dict:
"""
带重试机制的 API 请求
包含:
- 指数退避重连
- 限频处理(code: 3001)
- 超时设置
"""
url = f"{self.base_url}{endpoint}"
for attempt in range(max_retries):
try:
response = requests.request(
method,
url,
headers=self.headers,
params=params,
timeout=(3.05, 10) # (connect_timeout, read_timeout)
)
# 检查限频
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"触发限频,等待 {retry_after} 秒")
time.sleep(retry_after)
continue
result = response.json()
# 处理 TickDB 业务错误码
code = result.get("code", 0)
if code == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"API 限频 (code:3001),等待 {retry_after} 秒")
time.sleep(retry_after)
continue
elif code != 0:
raise RuntimeError(
f"API 错误 (code:{code}): {result.get('message', 'Unknown error')}"
)
return result
except requests.exceptions.Timeout:
print(f"请求超时(尝试 {attempt + 1}/{max_retries})")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 简单退避
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}(尝试 {attempt + 1}/{max_retries})")
if attempt == max_retries - 1:
raise
jitter = (hash(str(time.time())) % 100) / 500 # 0-0.2秒抖动
time.sleep(2 ** attempt + jitter)
raise RuntimeError("达到最大重试次数")
def get_kline(
self,
symbol: str,
interval: str = "1d",
start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: int = 1000
) -> pd.DataFrame:
"""
获取 K 线数据
参数:
symbol: 交易品种,如 'AAPL.US'
interval: K 线周期,如 '1d', '1h', '15m'
start_time: 开始时间戳(毫秒)
end_time: 结束时间戳(毫秒)
limit: 每次请求的最大条数
返回:
DataFrame: 包含 timestamp, open, high, low, close, volume
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
if start_time:
params["start"] = start_time
if end_time:
params["end"] = end_time
result = self._request_with_retry("GET", "/market/kline", params=params)
data = result.get("data", [])
if not data:
return pd.DataFrame()
# 转换为 DataFrame
df = pd.DataFrame(data)
df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
df = df[['timestamp', 'o', 'h', 'l', 'c', 'v']]
df.columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
return df
def get_aligned_klines(
self,
symbols: List[str],
start_date: str,
end_date: str,
interval: str = "1d"
) -> Dict[str, pd.DataFrame]:
"""
获取多个标的的对齐 K 线数据
自动对齐时间索引,处理停牌日
参数:
symbols: 交易品种列表
start_date: 开始日期 'YYYY-MM-DD'
end_date: 结束日期 'YYYY-MM-DD'
返回:
Dict[str, DataFrame]: symbol -> DataFrame
"""
start_ts = int(pd.Timestamp(start_date).timestamp() * 1000)
end_ts = int(pd.Timestamp(end_date).timestamp() * 1000)
results = {}
for symbol in symbols:
print(f"获取 {symbol} K 线数据...")
df = self.get_kline(
symbol=symbol,
interval=interval,
start_time=start_ts,
end_time=end_ts
)
if len(df) > 0:
df.set_index('timestamp', inplace=True)
results[symbol] = df
else:
print(f"警告: {symbol} 无数据")
# 避免触发限频
time.sleep(0.2)
# 对齐:取所有标的的交易日并集
if not results:
return {}
all_dates = results[list(results.keys())[0]].index
for symbol, df in results.items():
all_dates = all_dates.union(df.index)
# 补齐停牌日(forward fill)
aligned = {}
for symbol, df in results.items():
aligned_df = df.reindex(all_dates)
aligned_df = aligned_df.fillna(method='ffill')
aligned[symbol] = aligned_df
return aligned
def run_pair_backtest(
client: TickDBClient,
symbol_x: str,
symbol_y: str,
start_date: str,
end_date: str,
z_entry: float = 2.0,
z_exit: float = 0.5,
lookback_window: int = 60
) -> pd.DataFrame:
"""
配对策略回测
使用卡尔曼滤波动态估计 hedge ratio
参数:
client: TickDBClient 实例
symbol_x: 标的 X
symbol_y: 标的 Y
start_date: 回测开始日期
end_date: 回测结束日期
z_entry: 入场 z-score 阈值
z_exit: 出场 z-score 阈值
lookback_window: 初始 warmup 窗口
返回:
DataFrame: 回测每日状态
"""
# 获取对齐数据
aligned = client.get_aligned_klines(
symbols=[symbol_x, symbol_y],
start_date=start_date,
end_date=end_date
)
if symbol_x not in aligned or symbol_y not in aligned:
raise ValueError(f"无法获取 {symbol_x} 或 {symbol_y} 的数据")
df_x = aligned[symbol_x]['close']
df_y = aligned[symbol_y]['close']
# 初始化卡尔曼滤波器
kf = KalmanFilterPair(delta=1e-4, Ve=1e-3)
# 回测记录
trades = []
position = None # 'long_spread' or 'short_spread' or None
entry_z = 0.0
entry_spread = 0.0
daily_stats = []
for i in range(lookback_window, len(df_x)):
x = df_x.iloc[i]
y = df_y.iloc[i]
date = df_x.index[i]
# 更新卡尔曼滤波
state = kf.update(x, y)
# Warmup 期结束后开始交易
if i < lookback_window:
continue
# 交易逻辑
current_z = state.z_score
if position is None:
# 无持仓,检查入场信号
if current_z > z_entry:
position = 'short_spread'
entry_z = current_z
entry_spread = state.spread_std
elif current_z < -z_entry:
position = 'long_spread'
entry_z = current_z
entry_spread = state.spread_std
else:
# 有持仓,检查出场信号
should_exit = (
(position == 'long_spread' and current_z > -z_exit) or
(position == 'short_spread' and current_z < z_exit) or
# 止损:z-score 回归后再次扩大(趋势延续信号)
(position == 'long_spread' and current_z < entry_z - 1.0) or
(position == 'short_spread' and current_z > entry_z + 1.0)
)
if should_exit:
pnl = (
(current_z - entry_z) if position == 'long_spread'
else (entry_z - current_z)
)
trades.append({
'entry_date': trades[-1]['exit_date'] if trades else df_x.index[i-1],
'exit_date': date,
'direction': position,
'entry_z': entry_z,
'exit_z': current_z,
'pnl_z': pnl,
'pnl_pct': pnl / abs(entry_z) * 100 # 简化估算
})
position = None
daily_stats.append({
'date': date,
'price_x': x,
'price_y': y,
'beta': state.beta,
'spread': df_x.iloc[i] - state.beta * df_y.iloc[i],
'z_score': current_z,
'position': position,
'kalman_gain': state.P
})
return pd.DataFrame(daily_stats), pd.DataFrame(trades)
# 使用示例
if __name__ == "__main__":
# 初始化客户端
client = TickDBClient()
# 运行回测(以苹果 vs 标普 500 ETF 为例)
stats, trades = run_pair_backtest(
client=client,
symbol_x="AAPL.US",
symbol_y="SPY.US",
start_date="2023-01-01",
end_date="2024-01-01",
z_entry=2.0,
z_exit=0.5
)
if len(trades) > 0:
print(f"总交易次数: {len(trades)}")
print(f"胜率: {(trades['pnl_z'] > 0).mean():.2%}")
print(f"平均盈利(z-score): {trades.loc[trades['pnl_z'] > 0, 'pnl_z'].mean():.2f}")
print(f"平均亏损(z-score): {trades.loc[trades['pnl_z'] < 0, 'pnl_z'].mean():.2f}")
⚠️ 生产环境提示:上述代码演示了基本架构。实际回测需添加:资金管理模块、滑点模拟、佣金计算、仓位控制。TickDB 的
/v1/market/kline接口支持最长 10 年级别的历史 K 线数据,适用于长周期配对策略的验证。
六、配对筛选的工程架构
6.1 全流程流水线设计
从海量股票中筛选可套利配对,需要一套自动化的工程流水线:
┌─────────────────────────────────────────────────────────────────┐
│ L1: 数据准备层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ TickDB K线 │→ │ 停牌对齐 │→ │ 流动性过滤 │ │
│ │ 历史数据 │ │ 前复权处理 │ │ ADV > $5M │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ L2: 统计筛选层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Pearson相关 │→ │ Engle-Granger│→ │ 滚动稳定性 │ │
│ │ r > 0.7 │ │ p < 0.05 │ │ 协整通过>70% │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ L3: 动力学筛选层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ OU半衰期估计 │→ │ 卡尔曼β跟踪 │→ │ 模拟交易验证 │ │
│ │ 5-60天理想 │ │ 波动率监控 │ │ 夏普>1.0 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ L4: 候选池输出 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 配对候选列表 │ │ 参数配置文件 │ │ 每日监控任务 │ │
│ │ (Top 50) │ │ 阈值/窗口 │ │ 信号推送 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
6.2 阈值配置的分场景建议
| 场景 | 半衰期范围 | z_entry | z_exit | lookback | 说明 |
|---|---|---|---|---|---|
| 日内统计套利 | 5-30 分钟 | 1.5 | 0.3 | 20 | 高频监控,滑点敏感 |
| 短期配对交易 | 5-20 天 | 2.0 | 0.5 | 60 | 最常见配置 |
| 中期事件驱动 | 20-60 天 | 2.5 | 0.75 | 120 | 容忍更多噪声 |
| 长周期宏观对冲 | 60-180 天 | 3.0 | 1.0 | 252 | 低频,低夏普,高容量 |
七、常见陷阱与工程预警
7.1 统计陷阱
| 陷阱 | 症状 | 解决方案 |
|---|---|---|
| 过度拟合 | 样本内夏普 5.0,样本外 0.3 | 使用 walk-forward 验证,外推检验 |
| 前视偏差 | 回测盈利,实盘亏损 | 严格使用历史数据,区分入场/出场信号 |
| 幸存者偏差 | 只选当前存在的股票 | 使用包含已退市股票的完整历史数据库 |
| 协整幻觉 | 全量数据协整,滚动窗口不稳定 | 必须做滚动窗口稳定性检验 |
7.2 工程陷阱
| 陷阱 | 症状 | 解决方案 |
|---|---|---|
| 停牌未对齐 | 计算的 spread 出现巨大跳变 | 使用前复权数据 + 时间索引对齐 |
| 流动性断层 | 价差信号触发但无法成交 | 加入 ADV 过滤 + 滑点预估 |
| API 限频雪崩 | 回测启动后频繁 429 | 使用异步请求 + 令牌桶限流 |
| 时区混淆 | 盘前盘后数据错位 | 统一使用 UTC 或交易所本地时间 |
7.3 卡尔曼滤波的调参建议
# 参数敏感性分析
param_grid = {
'delta': [1e-5, 1e-4, 1e-3], # 状态转移方差
'Ve': [1e-4, 1e-3, 1e-2] # 观测噪声方差
}
# 经验法则
# delta 大 → β 跟踪更灵敏,但也更噪声
# Ve 大 → 更信任观测值(价格),β 更新更快
# 推荐起始值:delta=1e-4, Ve=1e-3
# 微调方向:
# - 如果 β 响应太慢(OLS-like),降低 Ve
# - 如果 β 噪声太大,降低 delta
八、结论与下一步行动
8.1 核心结论
协整检验是必要条件,不是充分条件。 一个配对通过协整检验,只能说明历史上存在均值回归关系。要将其转化为可盈利的策略,还需要:
- 滚动窗口稳定性:协整关系必须跨时段成立,而不是仅在某一年的样本内成立
- 半衰期合理性:均值回归速度必须与你的持仓周期匹配,理想区间是持仓周期的 0.5-2 倍
- 动态 β 追踪:卡尔曼滤波能够在不重新回归的情况下,实时更新 hedge ratio,避免固定 β 带来的系统性偏差
- 完整的回测验证:包括滑点、佣金、止损机制,以及 walk-forward 外推检验
8.2 下一步行动
如果你希望亲手实现配对筛选流水线:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key,设置环境变量
TICKDB_API_KEY - 下载本文代码,从两只你最熟悉的相关股票开始回测
- 用滚动窗口检验协整稳定性,观察卡尔曼 β 的漂移
如果你需要 10 年级别的历史 K 线数据(覆盖至少一个完整的牛熊周期),联系 [email protected] 了解机构方案。
如果你正在用其他数据源做配对研究,本文的协整检验和卡尔曼滤波模块可无缝替换为任何价格数据源。
回测局限性说明:上述回测结果基于 TickDB 历史 K 线数据模拟,不构成未来收益保证。回测中存在以下局限性:未完全模拟实际交易中的滑点和市场冲击成本(已假设固定滑点模拟);未考虑极端行情下的流动性枯竭风险;卡尔曼滤波参数(delta、Ve)基于经验选择,未做完整网格优化;样本量有限,统计显著性可能不足。建议在实际使用前进行更长时间跨度的 walk-forward 验证,并在实盘前进行模拟盘测试。
本文不构成任何投资建议。市场有风险,投资需谨慎。